1mod fetcher;
8mod file_handler;
9mod inspect_server;
10mod scheduler;
11
12use anyhow::{Context, Error, anyhow};
13use argh::FromArgs;
14use fidl::endpoints;
15use fidl::endpoints::ControlHandle;
16use fuchsia_component::server::ServiceFs;
17use fuchsia_inspect::component;
18use fuchsia_inspect::health::Reporter;
19use fuchsia_runtime::{HandleInfo, HandleType};
20use fuchsia_sync::Mutex;
21use futures::{FutureExt, StreamExt, TryStreamExt, select};
22use log::*;
23use persistence_build_config::Config;
24use sandbox::CapabilityRef;
25use scheduler::Scheduler;
26use serde::{Deserialize, Serialize};
27use std::pin::pin;
28use std::sync::{Arc, LazyLock};
29use zx::{BootInstant, HandleBased};
30use {
31 fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_diagnostics as fdiagnostics,
32 fidl_fuchsia_inspect as finspect, fidl_fuchsia_process_lifecycle as flifecycle,
33 fidl_fuchsia_update as fupdate, fuchsia_async as fasync,
34};
35
36pub const PROGRAM_NAME: &str = "persistence";
38pub const PERSIST_NODE_NAME: &str = "persist";
39pub const PUBLISHED_TIME_KEY: &str = "published";
41
42const INSTANCE_STATE_KEY: &str = "InstanceState";
45const FROZEN_INSPECT_VMO_KEY: &str = "FrozenInspectVMO";
47
48#[derive(Clone, Debug)]
50pub(crate) struct BuildConfig {
51 skip_update_check: bool,
54 stall_interval: zx::MonotonicDuration,
56}
57
58pub(crate) static BUILD_CONFIG: LazyLock<BuildConfig> = LazyLock::new(|| {
60 let config = Config::take_from_startup_handle();
61 component::inspector().root().record_child("config", |node| config.record_inspect(node));
62
63 let Config { skip_update_check, stop_on_idle_timeout_millis } = config;
64
65 if skip_update_check {
66 info!("Configured to skip update check");
67 }
68
69 let stall_interval = if stop_on_idle_timeout_millis >= 0 {
70 info!("Configured to idle after {stop_on_idle_timeout_millis}ms of inactivity");
71 zx::MonotonicDuration::from_millis(stop_on_idle_timeout_millis)
72 } else {
73 info!("Not configured to idle after inactivity");
74 zx::MonotonicDuration::INFINITE
75 };
76
77 BuildConfig { skip_update_check, stall_interval }
78});
79
80#[derive(FromArgs, Debug, PartialEq)]
82#[argh(subcommand, name = "persistence")]
83pub struct CommandLine {}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86enum UpdateCheckStage {
87 Waiting,
90 Skipped,
92 Done,
95 Error,
97}
98#[derive(Debug, Serialize, Deserialize)]
101struct PersistedState {
102 config: persistence_config::Config,
104 update_stage: Mutex<UpdateCheckStage>,
106}
107
108#[derive(Clone)]
110struct ComponentState {
111 persisted: Arc<PersistedState>,
113 scheduler: Scheduler,
115 inspect_controller: Arc<Mutex<Option<inspect_runtime::PublishedInspectController>>>,
117}
118
119impl ComponentState {
120 async fn load(
123 scope: fasync::ScopeHandle,
124 store: &sandbox::CapabilityStore,
125 ) -> Result<Self, Error> {
126 if let Some(dictionary) =
127 fuchsia_runtime::take_startup_handle(HandleInfo::new(HandleType::EscrowedDictionary, 0))
128 {
129 debug!("Loading component state from escrowed dictionary");
130 return ComponentState::from_escrow(scope.clone(), store, dictionary)
131 .await
132 .context("Failed to load component state from escrowed dictionary");
133 }
134
135 debug!("No escrowed dictionary available; generating one");
136 Self::new(scope.clone()).await.context("Failed to create component state")
137 }
138
139 async fn new(scope: fasync::ScopeHandle) -> Result<Self, Error> {
140 let inspect_controller = inspect_runtime::publish(
141 component::inspector(),
142 inspect_runtime::PublishOptions::default().custom_scope(scope.clone()),
143 )
144 .ok_or_else(|| anyhow!("failed to publish inspect"))?;
145
146 let config =
147 persistence_config::load_configuration_files().context("Error loading configs")?;
148 file_handler::forget_old_data(&config).await?;
149
150 let scheduler = Scheduler::new(&config);
151 scheduler
152 .subscribe(scope.clone(), &config)
153 .await
154 .context("Failed to subscribe to fuchsia.diagnostics.Sample")?;
155
156 let persisted = {
157 let update_stage = if BUILD_CONFIG.skip_update_check {
158 UpdateCheckStage::Skipped
159 } else {
160 UpdateCheckStage::Waiting
161 };
162 Arc::new(PersistedState { config, update_stage: Mutex::new(update_stage) })
163 };
164
165 if BUILD_CONFIG.skip_update_check {
166 publish_inspect_data().await;
167 } else {
168 let notifier_client = {
170 let (notifier_client, notifier_request_stream) =
171 fidl::endpoints::create_request_stream::<fupdate::NotifierMarker>();
172 let persisted = persisted.clone();
173 scope.spawn(async move {
174 if let Err(e) = handle_update_done(notifier_request_stream, persisted).await {
175 error!("Failed to handle NotifierRequest: {e}");
176 }
177 });
178 notifier_client
179 };
180
181 match fuchsia_component::client::connect_to_protocol::<fupdate::ListenerMarker>() {
182 Ok(proxy) => {
183 if let Err(e) = proxy.notify_on_first_update_check(
184 fupdate::ListenerNotifyOnFirstUpdateCheckRequest {
185 notifier: Some(notifier_client),
186 ..Default::default()
187 },
188 ) {
189 error!("Error subscribing to first update check; not publishing: {e:?}");
190 *persisted.update_stage.lock() = UpdateCheckStage::Error;
191 }
192 }
193 Err(e) => {
194 warn!(e:?; "Unable to connect to fuchsia.update.Listener; will publish immediately");
197 *persisted.update_stage.lock() = UpdateCheckStage::Done;
198 }
199 }
200 }
201
202 Ok(Self {
203 persisted,
204 scheduler,
205 inspect_controller: Arc::new(Mutex::new(Some(inspect_controller))),
206 })
207 }
208
209 async fn from_escrow<'a>(
210 scope: fasync::ScopeHandle,
211 store: &'a sandbox::CapabilityStore,
212 dictionary: zx::NullableHandle,
213 ) -> Result<Self, Error> {
214 let dict = store
215 .import(fsandbox::DictionaryRef { token: dictionary.into() })
216 .await
217 .context("Error importing from component startup handle")?;
218
219 let persisted_bytes = dict
220 .get::<sandbox::Data<'a>>(INSTANCE_STATE_KEY)
221 .await
222 .context("Error getting instance state")?
223 .export::<Vec<u8>>()
224 .await
225 .context("Error exporting as buffer")?;
226 let persisted: PersistedState = ciborium::from_reader(&persisted_bytes[..])
227 .context("Failed to deserialize InstanceState")?;
228 let update_stage = persisted.update_stage.lock().clone();
229
230 let inspect_controller = match update_stage {
231 UpdateCheckStage::Waiting | UpdateCheckStage::Error => {
232 let escrow_token = dict
236 .get::<sandbox::Handle<'a>>(FROZEN_INSPECT_VMO_KEY)
237 .await
238 .context("Failed to get frozen Inspect VMO")?
239 .export::<zx::NullableHandle>()
240 .await
241 .context("Failed to export handle")?
242 .into();
243
244 let token = finspect::EscrowToken { token: escrow_token };
246 let inspect_runtime::FetchEscrowResult { vmo: _, server } =
247 inspect_runtime::fetch_escrow(
248 token,
249 inspect_runtime::FetchEscrowOptions::new().replace_with_tree(),
250 )
251 .await
252 .context("Failed to fetch escrowed Inspect data")?;
253
254 let opts = inspect_runtime::PublishOptions::default()
255 .custom_scope(scope.clone())
256 .on_tree_server(server.context("FetchEscrow did not return a TreeHandle")?);
257
258 let inspect_controller = inspect_runtime::publish(component::inspector(), opts)
259 .context("Failed to publish Inspect data")?;
260
261 Some(inspect_controller)
262 }
263 UpdateCheckStage::Done | UpdateCheckStage::Skipped => {
264 None
271 }
272 };
273
274 Ok(Self {
282 scheduler: Scheduler::new(&persisted.config),
283 persisted: Arc::new(persisted),
284 inspect_controller: Arc::new(Mutex::new(inspect_controller)),
285 })
286 }
287
288 async fn as_escrowed_dict(
289 store: &sandbox::CapabilityStore,
290 persisted: impl AsRef<PersistedState>,
291 inspect_controller: Arc<Mutex<Option<inspect_runtime::PublishedInspectController>>>,
292 ) -> Result<fsandbox::DictionaryRef, Error> {
293 let dict = store.create_dictionary().await?;
294
295 let mut persisted_bytes: Vec<u8> = Vec::new();
297 ciborium::into_writer(persisted.as_ref(), &mut persisted_bytes)
298 .context("Failed to serialize InstanceState")?;
299 let data = store.import(persisted_bytes).await?;
300 dict.insert(INSTANCE_STATE_KEY, data).await?;
301
302 let inspect_controller = inspect_controller.lock().take();
304 if let Some(inspect_controller) = inspect_controller {
305 match inspect_controller.escrow_frozen(inspect_runtime::EscrowOptions::default()).await
306 {
307 Ok(escrow_token) => {
308 let handle = escrow_token.token.into_handle();
309 let data = store.import(handle).await?;
310 dict.insert(FROZEN_INSPECT_VMO_KEY, data).await?;
311 }
312 Err(e) => {
313 error!("Failed to escrow frozen Inspect VMO: {e:?}");
314 }
315 }
316 }
317
318 dict.export().await.context("Failed to export escrowed dictionary")
319 }
320}
321
322async fn handle_update_done(
326 stream: fupdate::NotifierRequestStream,
327 persisted: Arc<PersistedState>,
328) -> Result<(), Error> {
329 let (stream, stalled) = detect_stall::until_stalled(stream, BUILD_CONFIG.stall_interval);
330 let mut stream = pin!(stream);
331 if let Ok(Some(request)) = stream.try_next().await {
332 debug!("Received fuchsia.update.NotifierRequest");
333 match request {
334 fupdate::NotifierRequest::Notify { control_handle } => {
335 debug!("Received notification that the update check has completed");
336 let stage = persisted.update_stage.lock().clone();
337 match stage {
338 UpdateCheckStage::Skipped | UpdateCheckStage::Error => {
339 unreachable!("Received impossible notification")
340 }
341 UpdateCheckStage::Waiting => {
342 *persisted.update_stage.lock() = UpdateCheckStage::Done;
343 info!("...Update check has completed; publishing previous boot data");
344 publish_inspect_data().await;
345 control_handle.shutdown();
346 return Ok(());
347 }
348 UpdateCheckStage::Done => {
349 debug!("Ignoring update check notification; already received one");
350 control_handle.shutdown();
351 return Ok(());
352 }
353 }
354 }
355 }
356 }
357 if let Ok(Some(server_end)) = stalled.await {
358 debug!("Escrowing fuchsia.update.Notifier");
360 fuchsia_component::client::connect_channel_to_protocol_at_path(
361 server_end,
362 "/escrow/fuchsia.update.Notifier",
363 )
364 .context("Failed to connect to fuchsia.update.Notifier")?;
365 }
366 Ok(())
367}
368
369async fn publish_inspect_data() {
370 component::health().set_ok();
372 if let Err(e) = inspect_server::record_persist_node(PERSIST_NODE_NAME).await {
373 error!("Failed to serve persisted Inspect data from previous boot: {e}");
374 }
375 component::inspector().root().record_int(PUBLISHED_TIME_KEY, BootInstant::get().into_nanos());
376}
377
378enum IncomingRequest {
379 UpdateDone(fupdate::NotifierRequestStream),
380 SampleSink(fdiagnostics::SampleSinkRequestStream),
381}
382
383pub async fn main(_args: CommandLine) -> Result<(), Error> {
384 info!("Starting Diagnostics Persistence service");
385 component::init_inspector_with_size(1024 * 1024 * 5);
387 let scope = fasync::Scope::new();
388 let store = sandbox::CapabilityStore::connect()?;
389 let state = ComponentState::load(scope.to_handle(), &store)
390 .await
391 .context("Error getting escrowed state")?;
392 component::health().set_starting_up();
393
394 let mut fs = ServiceFs::new();
395 fs.dir("svc").add_fidl_service(IncomingRequest::UpdateDone);
396 fs.dir("svc").add_fidl_service(IncomingRequest::SampleSink);
397 fs.take_and_serve_directory_handle().expect("Failed to take service directory handle");
398
399 let lifecycle =
400 fuchsia_runtime::take_startup_handle(HandleInfo::new(HandleType::Lifecycle, 0)).unwrap();
401 let lifecycle: zx::Channel = lifecycle.into();
402 let lifecycle: endpoints::ServerEnd<flifecycle::LifecycleMarker> = lifecycle.into();
403 let (mut lifecycle_request_stream, lifecycle_control_handle) =
404 lifecycle.into_stream_and_control_handle();
405 let mut lifecycle_task = pin!(
406 async move {
407 match lifecycle_request_stream.next().await {
408 Some(Ok(flifecycle::LifecycleRequest::Stop { .. })) => {
409 debug!("Received stop request");
410 }
414 Some(Err(e)) => {
415 error!("Received FIDL error from Lifecycle: {e:?}");
416 std::future::pending::<()>().await
417 }
418 None => {
419 debug!("Lifecycle request stream closed");
420 std::future::pending::<()>().await
421 }
422 }
423 }
424 .fuse()
425 );
426
427 let mut outgoing_dir_task =
428 fs.until_stalled(BUILD_CONFIG.stall_interval).for_each_concurrent(None, move |item| {
429 let lifecycle_control_handle = lifecycle_control_handle.clone();
430 let state = state.clone();
431 let store = store.clone();
432 async move {
433 match item {
434 fuchsia_component::server::Item::Request(req, _active_guard) => match req {
435 IncomingRequest::UpdateDone(stream) => {
436 if let Err(e) = handle_update_done(stream, state.persisted).await {
437 error!("Failed to handle NotifierRequest: {e}");
438 }
439 },
440 IncomingRequest::SampleSink(stream) => {
441 if let Err(e) = state.scheduler.handle_sample_sink(stream).await {
442 error!("Failed to handle SampleSinkRequest: {e}");
443 }
444 },
445 },
446 fuchsia_component::server::Item::Stalled(outgoing_directory) => {
447 let escrowed_dictionary = match ComponentState::as_escrowed_dict(
448 &store,
449 state.persisted,
450 state.inspect_controller
451 ).await {
452 Ok(dict) => Some(dict),
453 Err(e) => {
454 error!(
455 "Failed to serialize PersistedState into component dictionary: {e}"
456 );
457 None
458 }
459 };
460 lifecycle_control_handle
461 .send_on_escrow(flifecycle::LifecycleOnEscrowRequest {
462 outgoing_dir: Some(outgoing_directory.into()),
463 escrowed_dictionary,
464 ..Default::default()
465 })
466 .unwrap();
467 }
468 }
469 }
470 });
471
472 select! {
473 _ = lifecycle_task => {
474 info!("Stopping due to lifecycle request");
475 scope.cancel().await;
476 },
477 _ = outgoing_dir_task => {
478 info!("Stopping due to idle activity");
479 scope.join().await;
480 },
481 }
482
483 Ok(())
484}