#![deny(missing_docs)]
pub mod state;
pub use state::{
FailFetchData, FailStageData, FetchFailureReason, PrepareFailureReason, Progress,
StageFailureReason, State, StateId, UpdateInfo, UpdateInfoAndProgress,
};
pub mod options;
pub use options::{Initiator, Options};
use fidl::endpoints::{ClientEnd, ServerEnd};
use fidl_fuchsia_update_installer::{
InstallerProxy, MonitorMarker, MonitorRequest, MonitorRequestStream, RebootControllerMarker,
UpdateNotStartedReason,
};
use fuchsia_url::AbsolutePackageUrl;
use futures::prelude::*;
use futures::task::{Context, Poll};
use log::info;
use pin_project::pin_project;
use std::fmt;
use std::pin::Pin;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum UpdateAttemptError {
#[error("FIDL error")]
FIDL(#[source] fidl::Error),
#[error("an installation was already in progress")]
InstallInProgress,
}
#[derive(Debug, Error)]
pub enum MonitorUpdateAttemptError {
#[error("FIDL error")]
FIDL(#[source] fidl::Error),
#[error("unable to decode State")]
DecodeState(#[source] state::DecodeStateError),
}
#[pin_project(project = UpdateAttemptProj)]
#[derive(Debug)]
pub struct UpdateAttempt {
attempt_id: String,
#[pin]
monitor: UpdateAttemptMonitor,
}
#[pin_project(project = UpdateAttemptMonitorProj)]
pub struct UpdateAttemptMonitor {
#[pin]
stream: MonitorRequestStream,
}
impl UpdateAttempt {
pub fn attempt_id(&self) -> &str {
&self.attempt_id
}
}
impl UpdateAttemptMonitor {
fn new() -> Result<(ClientEnd<MonitorMarker>, Self), fidl::Error> {
let (monitor_client_end, stream) =
fidl::endpoints::create_request_stream::<MonitorMarker>();
Ok((monitor_client_end, Self { stream }))
}
pub fn from_stream(stream: MonitorRequestStream) -> Self {
Self { stream }
}
}
pub async fn start_update(
update_url: &AbsolutePackageUrl,
options: Options,
installer_proxy: &InstallerProxy,
reboot_controller_server_end: Option<ServerEnd<RebootControllerMarker>>,
) -> Result<UpdateAttempt, UpdateAttemptError> {
let url = fidl_fuchsia_pkg::PackageUrl { url: update_url.to_string() };
let (monitor_client_end, monitor) =
UpdateAttemptMonitor::new().map_err(UpdateAttemptError::FIDL)?;
let attempt_id = installer_proxy
.start_update(&url, &options.into(), monitor_client_end, reboot_controller_server_end)
.await
.map_err(UpdateAttemptError::FIDL)?
.map_err(|reason| match reason {
UpdateNotStartedReason::AlreadyInProgress => UpdateAttemptError::InstallInProgress,
})?;
info!("Update started with attempt id: {}", attempt_id);
Ok(UpdateAttempt { attempt_id, monitor })
}
pub async fn monitor_update(
attempt_id: Option<&str>,
installer_proxy: &InstallerProxy,
) -> Result<Option<UpdateAttemptMonitor>, fidl::Error> {
let (monitor_client_end, monitor) = UpdateAttemptMonitor::new()?;
let attached = installer_proxy.monitor_update(attempt_id, monitor_client_end).await?;
if attached {
Ok(Some(monitor))
} else {
Ok(None)
}
}
impl fmt::Debug for UpdateAttemptMonitor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("UpdateAttemptMonitor").field("stream", &"MonitorRequestStream").finish()
}
}
impl Stream for UpdateAttemptMonitor {
type Item = Result<State, MonitorUpdateAttemptError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let UpdateAttemptMonitorProj { stream } = self.project();
let poll_res = match stream.poll_next(cx) {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Ready(Some(res)) => res.map_err(MonitorUpdateAttemptError::FIDL)?,
Poll::Pending => return Poll::Pending,
};
let MonitorRequest::OnState { state, responder } = poll_res;
let _ = responder.send();
let state = state.try_into().map_err(MonitorUpdateAttemptError::DecodeState)?;
Poll::Ready(Some(Ok(state)))
}
}
impl Stream for UpdateAttempt {
type Item = Result<State, MonitorUpdateAttemptError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let UpdateAttemptProj { attempt_id: _, monitor } = self.project();
monitor.poll_next(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use fidl_fuchsia_update_installer::{
InstallationProgress, InstallerMarker, InstallerRequest, MonitorProxy,
};
use fuchsia_async as fasync;
use futures::stream::StreamExt;
const TEST_URL: &str = "fuchsia-pkg://fuchsia.com/update/0";
impl UpdateAttemptMonitor {
fn new_test() -> (TestAttempt, Self) {
let (monitor_client_end, monitor) = Self::new().unwrap();
(TestAttempt::new(monitor_client_end), monitor)
}
}
struct TestAttempt {
proxy: MonitorProxy,
}
impl TestAttempt {
fn new(monitor_client_end: ClientEnd<MonitorMarker>) -> Self {
let proxy = monitor_client_end.into_proxy();
Self { proxy }
}
async fn send_state_and_recv_ack(&mut self, state: State) {
self.send_raw_state_and_recv_ack(state.into()).await;
}
async fn send_raw_state_and_recv_ack(
&mut self,
state: fidl_fuchsia_update_installer::State,
) {
let () = self.proxy.on_state(&state).await.unwrap();
}
}
#[fasync::run_singlethreaded(test)]
async fn update_attempt_monitor_forwards_and_acks_progress() {
let (mut send, monitor) = UpdateAttemptMonitor::new_test();
let expected_fetch_state = &State::Fetch(
UpdateInfoAndProgress::builder()
.info(UpdateInfo::builder().download_size(1000).build())
.progress(Progress::builder().fraction_completed(0.5).bytes_downloaded(500).build())
.build(),
);
let client_fut = async move {
assert_eq!(
monitor.try_collect::<Vec<State>>().await.unwrap(),
vec![State::Prepare, expected_fetch_state.clone()]
);
};
let server_fut = async move {
send.send_state_and_recv_ack(State::Prepare).await;
send.send_state_and_recv_ack(expected_fetch_state.clone()).await;
};
future::join(client_fut, server_fut).await;
}
#[fasync::run_singlethreaded(test)]
async fn update_attempt_monitor_rejects_invalid_state() {
let (mut send, mut monitor) = UpdateAttemptMonitor::new_test();
let client_fut = async move {
assert_matches!(
monitor.next().await.unwrap(),
Err(MonitorUpdateAttemptError::DecodeState(_))
);
assert_matches!(monitor.next().await, Some(Ok(State::Prepare)));
};
let server_fut = async move {
send.send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
fidl_fuchsia_update_installer::FetchData {
info: Some(fidl_fuchsia_update_installer::UpdateInfo {
download_size: None,
..Default::default()
}),
progress: Some(InstallationProgress {
fraction_completed: Some(2.0),
bytes_downloaded: None,
..Default::default()
}),
..Default::default()
},
))
.await;
send.send_state_and_recv_ack(State::Prepare).await;
};
future::join(client_fut, server_fut).await;
}
#[fasync::run_singlethreaded(test)]
async fn start_update_forwards_args_and_returns_attempt_id() {
let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
let opts = Options {
initiator: Initiator::User,
allow_attach_to_existing_attempt: false,
should_write_recovery: true,
};
let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
let (_reboot_controller, reboot_controller_server_end) =
fidl::endpoints::create_proxy::<RebootControllerMarker>();
let installer_fut = async move {
let returned_update_attempt =
start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
.await
.unwrap();
assert_eq!(
returned_update_attempt.attempt_id(),
"00000000-0000-0000-0000-000000000001"
);
};
let stream_fut = async move {
match stream.next().await.unwrap() {
Ok(InstallerRequest::StartUpdate {
url,
options:
fidl_fuchsia_update_installer::Options {
initiator,
should_write_recovery,
allow_attach_to_existing_attempt,
..
},
monitor: _,
reboot_controller,
responder,
}) => {
assert_eq!(url.url, TEST_URL);
assert_eq!(initiator, Some(fidl_fuchsia_update_installer::Initiator::User));
assert_matches!(reboot_controller, Some(_));
assert_eq!(should_write_recovery, Some(true));
assert_eq!(allow_attach_to_existing_attempt, Some(false));
responder.send(Ok("00000000-0000-0000-0000-000000000001")).unwrap();
}
request => panic!("Unexpected request: {request:?}"),
}
};
future::join(installer_fut, stream_fut).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_install_error() {
let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
let opts = Options {
initiator: Initiator::User,
allow_attach_to_existing_attempt: false,
should_write_recovery: true,
};
let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
let (_reboot_controller, reboot_controller_server_end) =
fidl::endpoints::create_proxy::<RebootControllerMarker>();
let installer_fut = async move {
let returned_update_attempt =
start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
.await
.unwrap();
assert_eq!(
returned_update_attempt.try_collect::<Vec<State>>().await.unwrap(),
vec![State::FailPrepare(PrepareFailureReason::Internal)]
);
};
let stream_fut = async move {
match stream.next().await.unwrap() {
Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
responder.send(Ok("00000000-0000-0000-0000-000000000002")).unwrap();
let mut attempt = TestAttempt::new(monitor);
attempt
.send_state_and_recv_ack(State::FailPrepare(PrepareFailureReason::Internal))
.await;
}
request => panic!("Unexpected request: {request:?}"),
}
};
future::join(installer_fut, stream_fut).await;
}
#[fasync::run_singlethreaded(test)]
async fn start_update_forwards_fidl_error() {
let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
let opts = Options {
initiator: Initiator::User,
allow_attach_to_existing_attempt: false,
should_write_recovery: true,
};
let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
let installer_fut = async move {
match start_update(&pkgurl, opts, &proxy, None).await {
Err(UpdateAttemptError::FIDL(_)) => {} _ => panic!("Unexpected result"),
}
};
let stream_fut = async move {
match stream.next().await.unwrap() {
Ok(InstallerRequest::StartUpdate { .. }) => {
}
request => panic!("Unexpected request: {request:?}"),
}
};
future::join(installer_fut, stream_fut).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_state_decode_error() {
let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
let opts = Options {
initiator: Initiator::User,
allow_attach_to_existing_attempt: false,
should_write_recovery: true,
};
let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
let (_reboot_controller, reboot_controller_server_end) =
fidl::endpoints::create_proxy::<RebootControllerMarker>();
let installer_fut = async move {
let mut returned_update_attempt =
start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
.await
.unwrap();
assert_matches!(
returned_update_attempt.next().await,
Some(Err(MonitorUpdateAttemptError::DecodeState(
state::DecodeStateError::DecodeProgress(
state::DecodeProgressError::FractionCompletedOutOfRange
)
)))
);
};
let stream_fut = async move {
match stream.next().await.unwrap() {
Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
responder.send(Ok("00000000-0000-0000-0000-000000000002")).unwrap();
let mut monitor = TestAttempt::new(monitor);
monitor
.send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
fidl_fuchsia_update_installer::FetchData {
info: Some(fidl_fuchsia_update_installer::UpdateInfo {
download_size: None,
..Default::default()
}),
progress: Some(InstallationProgress {
fraction_completed: Some(2.0),
bytes_downloaded: None,
..Default::default()
}),
..Default::default()
},
))
.await;
}
request => panic!("Unexpected request: {request:?}"),
}
};
future::join(installer_fut, stream_fut).await;
}
#[fasync::run_singlethreaded(test)]
async fn test_server_close_unexpectedly() {
let pkgurl = "fuchsia-pkg://fuchsia.com/update/0".parse().unwrap();
let opts = Options {
initiator: Initiator::User,
allow_attach_to_existing_attempt: false,
should_write_recovery: true,
};
let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
let (_reboot_controller, reboot_controller_server_end) =
fidl::endpoints::create_proxy::<RebootControllerMarker>();
let expected_states = vec![
State::Prepare,
State::Fetch(
UpdateInfoAndProgress::builder()
.info(UpdateInfo::builder().download_size(0).build())
.progress(
Progress::builder().fraction_completed(0.0).bytes_downloaded(0).build(),
)
.build(),
),
];
let installer_fut = async move {
let returned_update_attempt =
start_update(&pkgurl, opts, &proxy, Some(reboot_controller_server_end))
.await
.unwrap();
assert_eq!(
returned_update_attempt.try_collect::<Vec<State>>().await.unwrap(),
expected_states,
);
};
let stream_fut = async move {
match stream.next().await.unwrap() {
Ok(InstallerRequest::StartUpdate { monitor, responder, .. }) => {
responder.send(Ok("00000000-0000-0000-0000-000000000003")).unwrap();
let mut monitor = TestAttempt::new(monitor);
monitor.send_state_and_recv_ack(State::Prepare).await;
monitor
.send_raw_state_and_recv_ack(fidl_fuchsia_update_installer::State::Fetch(
fidl_fuchsia_update_installer::FetchData {
info: Some(fidl_fuchsia_update_installer::UpdateInfo {
download_size: None,
..Default::default()
}),
progress: Some(InstallationProgress {
fraction_completed: Some(0.0),
bytes_downloaded: None,
..Default::default()
}),
..Default::default()
},
))
.await;
}
request => panic!("Unexpected request: {request:?}"),
}
};
future::join(installer_fut, stream_fut).await;
}
#[fasync::run_singlethreaded(test)]
async fn monitor_update_uses_provided_attempt_id() {
let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
let client_fut = async move {
let _ = monitor_update(Some("id"), &proxy).await;
};
let server_fut = async move {
match stream.next().await.unwrap().unwrap() {
InstallerRequest::MonitorUpdate { attempt_id, .. } => {
assert_eq!(attempt_id.as_deref(), Some("id"));
}
request => panic!("Unexpected request: {request:?}"),
}
};
future::join(client_fut, server_fut).await;
}
#[fasync::run_singlethreaded(test)]
async fn monitor_update_handles_no_update_in_progress() {
let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
let client_fut = async move {
assert_matches!(monitor_update(None, &proxy).await, Ok(None));
};
let server_fut = async move {
match stream.next().await.unwrap().unwrap() {
InstallerRequest::MonitorUpdate { attempt_id, monitor, responder } => {
assert_eq!(attempt_id, None);
drop(monitor);
responder.send(false).unwrap();
}
request => panic!("Unexpected request: {request:?}"),
}
assert_matches!(stream.next().await, None);
};
future::join(client_fut, server_fut).await;
}
#[fasync::run_singlethreaded(test)]
async fn monitor_update_forwards_fidl_error() {
let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
let client_fut = async move {
assert_matches!(monitor_update(None, &proxy).await, Err(_));
};
let server_fut = async move {
match stream.next().await.unwrap() {
Ok(InstallerRequest::MonitorUpdate { .. }) => {
}
request => panic!("Unexpected request: {request:?}"),
}
};
future::join(client_fut, server_fut).await;
}
#[fasync::run_singlethreaded(test)]
async fn monitor_update_forwards_and_acks_progress() {
let (proxy, mut stream) = fidl::endpoints::create_proxy_and_stream::<InstallerMarker>();
let client_fut = async move {
let monitor = monitor_update(None, &proxy).await.unwrap().unwrap();
assert_eq!(
monitor.try_collect::<Vec<State>>().await.unwrap(),
vec![State::Prepare, State::FailPrepare(PrepareFailureReason::Internal)]
);
};
let server_fut = async move {
match stream.next().await.unwrap().unwrap() {
InstallerRequest::MonitorUpdate { attempt_id, monitor, responder } => {
assert_eq!(attempt_id, None);
responder.send(true).unwrap();
let mut monitor = TestAttempt::new(monitor);
monitor.send_state_and_recv_ack(State::Prepare).await;
monitor
.send_state_and_recv_ack(State::FailPrepare(PrepareFailureReason::Internal))
.await;
}
request => panic!("Unexpected request: {request:?}"),
}
assert_matches!(stream.next().await, None);
};
future::join(client_fut, server_fut).await;
}
}