Skip to main content

persistence/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! `diagnostics-persistence` component persists Inspect VMOs and serves them at the next boot.
6
7mod fetcher;
8mod file_handler;
9mod inspect_server;
10mod scheduler;
11
12use anyhow::{Context, Error, anyhow};
13use argh::FromArgs;
14use fidl::endpoints;
15use fidl::endpoints::ControlHandle;
16use fuchsia_component::server::ServiceFs;
17use fuchsia_inspect::component;
18use fuchsia_inspect::health::Reporter;
19use fuchsia_runtime::{HandleInfo, HandleType};
20use fuchsia_sync::Mutex;
21use futures::{FutureExt, StreamExt, TryStreamExt, select};
22use log::*;
23use persistence_build_config::Config;
24use sandbox::CapabilityRef;
25use scheduler::Scheduler;
26use serde::{Deserialize, Serialize};
27use std::pin::pin;
28use std::sync::{Arc, LazyLock};
29use zx::{BootInstant, HandleBased};
30use {
31    fidl_fuchsia_component_sandbox as fsandbox, fidl_fuchsia_diagnostics as fdiagnostics,
32    fidl_fuchsia_inspect as finspect, fidl_fuchsia_process_lifecycle as flifecycle,
33    fidl_fuchsia_update as fupdate, fuchsia_async as fasync,
34};
35
36/// The name of the subcommand and the logs-tag.
37pub const PROGRAM_NAME: &str = "persistence";
38pub const PERSIST_NODE_NAME: &str = "persist";
39/// Added after persisted data is fully published
40pub const PUBLISHED_TIME_KEY: &str = "published";
41
42/// Key in escrowed dictionary to immutable state persisted across instances of
43/// this component across the same boot.
44const INSTANCE_STATE_KEY: &str = "InstanceState";
45/// Key in escrowed dictionary to frozen Inspect VMO.
46const FROZEN_INSPECT_VMO_KEY: &str = "FrozenInspectVMO";
47
48/// Parsed CML structured configuration.
49#[derive(Clone, Debug)]
50pub(crate) struct BuildConfig {
51    /// If true, don't wait for a successful update check before publishing
52    /// previous boot's persisted Inspect data.
53    skip_update_check: bool,
54    /// Duration to wait for FIDL requests before stalling the connection.
55    stall_interval: zx::MonotonicDuration,
56}
57
58/// Build config, as defined by the CML structured configuration.
59pub(crate) static BUILD_CONFIG: LazyLock<BuildConfig> = LazyLock::new(|| {
60    let config = Config::take_from_startup_handle();
61    component::inspector().root().record_child("config", |node| config.record_inspect(node));
62
63    let Config { skip_update_check, stop_on_idle_timeout_millis } = config;
64
65    if skip_update_check {
66        info!("Configured to skip update check");
67    }
68
69    let stall_interval = if stop_on_idle_timeout_millis >= 0 {
70        info!("Configured to idle after {stop_on_idle_timeout_millis}ms of inactivity");
71        zx::MonotonicDuration::from_millis(stop_on_idle_timeout_millis)
72    } else {
73        info!("Not configured to idle after inactivity");
74        zx::MonotonicDuration::INFINITE
75    };
76
77    BuildConfig { skip_update_check, stall_interval }
78});
79
80/// Command line args
81#[derive(FromArgs, Debug, PartialEq)]
82#[argh(subcommand, name = "persistence")]
83pub struct CommandLine {}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86enum UpdateCheckStage {
87    /// Waiting for the first update check before publishing previous boot
88    /// inspect data.
89    Waiting,
90    /// First update check was skipped, previous boot inspect data has been published.
91    Skipped,
92    /// First update check has completed, previous boot inspect data has been
93    /// published.
94    Done,
95    /// Unable to subscribe to the first update check.
96    Error,
97}
98/// State to be persisted between instances of this component across
99/// the same boot.
100#[derive(Debug, Serialize, Deserialize)]
101struct PersistedState {
102    /// Persistence config loaded from disk.
103    config: persistence_config::Config,
104    /// Stage of the update check.
105    update_stage: Mutex<UpdateCheckStage>,
106}
107
108/// All component-specific state.
109#[derive(Clone)]
110struct ComponentState {
111    /// State persisted across instances of this component.
112    persisted: Arc<PersistedState>,
113    /// Listener for Sample
114    scheduler: Scheduler,
115    /// Controller to republish escrowed Inspect data, if available.
116    inspect_controller: Arc<Mutex<Option<inspect_runtime::PublishedInspectController>>>,
117}
118
119impl ComponentState {
120    /// Load state from a previous instance if possible, otherwise initialize
121    /// new state.
122    async fn load(
123        scope: fasync::ScopeHandle,
124        store: &sandbox::CapabilityStore,
125    ) -> Result<Self, Error> {
126        if let Some(dictionary) =
127            fuchsia_runtime::take_startup_handle(HandleInfo::new(HandleType::EscrowedDictionary, 0))
128        {
129            debug!("Loading component state from escrowed dictionary");
130            return ComponentState::from_escrow(scope.clone(), store, dictionary)
131                .await
132                .context("Failed to load component state from escrowed dictionary");
133        }
134
135        debug!("No escrowed dictionary available; generating one");
136        Self::new(scope.clone()).await.context("Failed to create component state")
137    }
138
139    async fn new(scope: fasync::ScopeHandle) -> Result<Self, Error> {
140        let inspect_controller = inspect_runtime::publish(
141            component::inspector(),
142            inspect_runtime::PublishOptions::default().custom_scope(scope.clone()),
143        )
144        .ok_or_else(|| anyhow!("failed to publish inspect"))?;
145
146        let config =
147            persistence_config::load_configuration_files().context("Error loading configs")?;
148        file_handler::forget_old_data(&config).await?;
149
150        let scheduler = Scheduler::new(&config);
151        scheduler
152            .subscribe(scope.clone(), &config)
153            .await
154            .context("Failed to subscribe to fuchsia.diagnostics.Sample")?;
155
156        let persisted = {
157            let update_stage = if BUILD_CONFIG.skip_update_check {
158                UpdateCheckStage::Skipped
159            } else {
160                UpdateCheckStage::Waiting
161            };
162            Arc::new(PersistedState { config, update_stage: Mutex::new(update_stage) })
163        };
164
165        if BUILD_CONFIG.skip_update_check {
166            publish_inspect_data().await;
167        } else {
168            // Listen for the first update check.
169            let notifier_client = {
170                let (notifier_client, notifier_request_stream) =
171                    fidl::endpoints::create_request_stream::<fupdate::NotifierMarker>();
172                let persisted = persisted.clone();
173                scope.spawn(async move {
174                    if let Err(e) = handle_update_done(notifier_request_stream, persisted).await {
175                        error!("Failed to handle NotifierRequest: {e}");
176                    }
177                });
178                notifier_client
179            };
180
181            match fuchsia_component::client::connect_to_protocol::<fupdate::ListenerMarker>() {
182                Ok(proxy) => {
183                    if let Err(e) = proxy.notify_on_first_update_check(
184                        fupdate::ListenerNotifyOnFirstUpdateCheckRequest {
185                            notifier: Some(notifier_client),
186                            ..Default::default()
187                        },
188                    ) {
189                        error!("Error subscribing to first update check; not publishing: {e:?}");
190                        *persisted.update_stage.lock() = UpdateCheckStage::Error;
191                    }
192                }
193                Err(e) => {
194                    // TODO(https://fxbug.dev/444526593): Consider bailing
195                    // if the update checker is not available.
196                    warn!(e:?; "Unable to connect to fuchsia.update.Listener; will publish immediately");
197                    *persisted.update_stage.lock() = UpdateCheckStage::Done;
198                }
199            }
200        }
201
202        Ok(Self {
203            persisted,
204            scheduler,
205            inspect_controller: Arc::new(Mutex::new(Some(inspect_controller))),
206        })
207    }
208
209    async fn from_escrow<'a>(
210        scope: fasync::ScopeHandle,
211        store: &'a sandbox::CapabilityStore,
212        dictionary: zx::NullableHandle,
213    ) -> Result<Self, Error> {
214        let dict = store
215            .import(fsandbox::DictionaryRef { token: dictionary.into() })
216            .await
217            .context("Error importing from component startup handle")?;
218
219        let persisted_bytes = dict
220            .get::<sandbox::Data<'a>>(INSTANCE_STATE_KEY)
221            .await
222            .context("Error getting instance state")?
223            .export::<Vec<u8>>()
224            .await
225            .context("Error exporting as buffer")?;
226        let persisted: PersistedState = ciborium::from_reader(&persisted_bytes[..])
227            .context("Failed to deserialize InstanceState")?;
228        let update_stage = persisted.update_stage.lock().clone();
229
230        let inspect_controller = match update_stage {
231            UpdateCheckStage::Waiting | UpdateCheckStage::Error => {
232                // Create a new, writable Inspect tree. The previous instance of
233                // Persistence did not receive the signal to persist data from
234                // the last boot, but this instance might.
235                let escrow_token = dict
236                    .get::<sandbox::Handle<'a>>(FROZEN_INSPECT_VMO_KEY)
237                    .await
238                    .context("Failed to get frozen Inspect VMO")?
239                    .export::<zx::NullableHandle>()
240                    .await
241                    .context("Failed to export handle")?
242                    .into();
243
244                // Swap escrowed Inspect data with a new Tree server.
245                let token = finspect::EscrowToken { token: escrow_token };
246                let inspect_runtime::FetchEscrowResult { vmo: _, server } =
247                    inspect_runtime::fetch_escrow(
248                        token,
249                        inspect_runtime::FetchEscrowOptions::new().replace_with_tree(),
250                    )
251                    .await
252                    .context("Failed to fetch escrowed Inspect data")?;
253
254                let opts = inspect_runtime::PublishOptions::default()
255                    .custom_scope(scope.clone())
256                    .on_tree_server(server.context("FetchEscrow did not return a TreeHandle")?);
257
258                let inspect_controller = inspect_runtime::publish(component::inspector(), opts)
259                    .context("Failed to publish Inspect data")?;
260
261                Some(inspect_controller)
262            }
263            UpdateCheckStage::Done | UpdateCheckStage::Skipped => {
264                // Persistence has already published persisted data from last
265                // boot. By not republishing, the existing frozen Inspect data
266                // remains published.
267                //
268                // Persistence needs to continue running to record data to
269                // persist for the next boot.
270                None
271            }
272        };
273
274        // Do not spawn FIDL request handlers when returning from escrow. The
275        // previous component instance escrowed its request streams, sending
276        // them to the Component Framework. When an incoming request is received
277        // on escrowed channels held by the Component Framework, it will be
278        // routed to this instance's incoming namespace (via IncomingRequest)
279        // then this instance will spawn new request handlers.
280
281        Ok(Self {
282            scheduler: Scheduler::new(&persisted.config),
283            persisted: Arc::new(persisted),
284            inspect_controller: Arc::new(Mutex::new(inspect_controller)),
285        })
286    }
287
288    async fn as_escrowed_dict(
289        store: &sandbox::CapabilityStore,
290        persisted: impl AsRef<PersistedState>,
291        inspect_controller: Arc<Mutex<Option<inspect_runtime::PublishedInspectController>>>,
292    ) -> Result<fsandbox::DictionaryRef, Error> {
293        let dict = store.create_dictionary().await?;
294
295        // Save PersistedState
296        let mut persisted_bytes: Vec<u8> = Vec::new();
297        ciborium::into_writer(persisted.as_ref(), &mut persisted_bytes)
298            .context("Failed to serialize InstanceState")?;
299        let data = store.import(persisted_bytes).await?;
300        dict.insert(INSTANCE_STATE_KEY, data).await?;
301
302        // Save frozen Inspect VMO.
303        let inspect_controller = inspect_controller.lock().take();
304        if let Some(inspect_controller) = inspect_controller {
305            match inspect_controller.escrow_frozen(inspect_runtime::EscrowOptions::default()).await
306            {
307                Ok(escrow_token) => {
308                    let handle = escrow_token.token.into_handle();
309                    let data = store.import(handle).await?;
310                    dict.insert(FROZEN_INSPECT_VMO_KEY, data).await?;
311                }
312                Err(e) => {
313                    error!("Failed to escrow frozen Inspect VMO: {e:?}");
314                }
315            }
316        }
317
318        dict.export().await.context("Failed to export escrowed dictionary")
319    }
320}
321
322/// Handle fuchsia.update/Notifier requests. Notifies of when an update check
323/// has been completed, signaling this component to publish persisted data to
324/// Inspect.
325async fn handle_update_done(
326    stream: fupdate::NotifierRequestStream,
327    persisted: Arc<PersistedState>,
328) -> Result<(), Error> {
329    let (stream, stalled) = detect_stall::until_stalled(stream, BUILD_CONFIG.stall_interval);
330    let mut stream = pin!(stream);
331    if let Ok(Some(request)) = stream.try_next().await {
332        debug!("Received fuchsia.update.NotifierRequest");
333        match request {
334            fupdate::NotifierRequest::Notify { control_handle } => {
335                debug!("Received notification that the update check has completed");
336                let stage = persisted.update_stage.lock().clone();
337                match stage {
338                    UpdateCheckStage::Skipped | UpdateCheckStage::Error => {
339                        unreachable!("Received impossible notification")
340                    }
341                    UpdateCheckStage::Waiting => {
342                        *persisted.update_stage.lock() = UpdateCheckStage::Done;
343                        info!("...Update check has completed; publishing previous boot data");
344                        publish_inspect_data().await;
345                        control_handle.shutdown();
346                        return Ok(());
347                    }
348                    UpdateCheckStage::Done => {
349                        debug!("Ignoring update check notification; already received one");
350                        control_handle.shutdown();
351                        return Ok(());
352                    }
353                }
354            }
355        }
356    }
357    if let Ok(Some(server_end)) = stalled.await {
358        // Send the server endpoint back to the framework.
359        debug!("Escrowing fuchsia.update.Notifier");
360        fuchsia_component::client::connect_channel_to_protocol_at_path(
361            server_end,
362            "/escrow/fuchsia.update.Notifier",
363        )
364        .context("Failed to connect to fuchsia.update.Notifier")?;
365    }
366    Ok(())
367}
368
369async fn publish_inspect_data() {
370    // TODO(https://fxbug.dev/444525059): Set health properly.
371    component::health().set_ok();
372    if let Err(e) = inspect_server::record_persist_node(PERSIST_NODE_NAME).await {
373        error!("Failed to serve persisted Inspect data from previous boot: {e}");
374    }
375    component::inspector().root().record_int(PUBLISHED_TIME_KEY, BootInstant::get().into_nanos());
376}
377
378enum IncomingRequest {
379    UpdateDone(fupdate::NotifierRequestStream),
380    SampleSink(fdiagnostics::SampleSinkRequestStream),
381}
382
383pub async fn main(_args: CommandLine) -> Result<(), Error> {
384    info!("Starting Diagnostics Persistence service");
385    // initialize to 5MiB
386    component::init_inspector_with_size(1024 * 1024 * 5);
387    let scope = fasync::Scope::new();
388    let store = sandbox::CapabilityStore::connect()?;
389    let state = ComponentState::load(scope.to_handle(), &store)
390        .await
391        .context("Error getting escrowed state")?;
392    component::health().set_starting_up();
393
394    let mut fs = ServiceFs::new();
395    fs.dir("svc").add_fidl_service(IncomingRequest::UpdateDone);
396    fs.dir("svc").add_fidl_service(IncomingRequest::SampleSink);
397    fs.take_and_serve_directory_handle().expect("Failed to take service directory handle");
398
399    let lifecycle =
400        fuchsia_runtime::take_startup_handle(HandleInfo::new(HandleType::Lifecycle, 0)).unwrap();
401    let lifecycle: zx::Channel = lifecycle.into();
402    let lifecycle: endpoints::ServerEnd<flifecycle::LifecycleMarker> = lifecycle.into();
403    let (mut lifecycle_request_stream, lifecycle_control_handle) =
404        lifecycle.into_stream_and_control_handle();
405    let mut lifecycle_task = pin!(
406        async move {
407            match lifecycle_request_stream.next().await {
408                Some(Ok(flifecycle::LifecycleRequest::Stop { .. })) => {
409                    debug!("Received stop request");
410                    // TODO(https://fxbug.dev/444529707): Teach `ServiceFs` and
411                    // others to skip the `until_stalled` timeout when this happens
412                    // so we can cleanly stop the component.
413                }
414                Some(Err(e)) => {
415                    error!("Received FIDL error from Lifecycle: {e:?}");
416                    std::future::pending::<()>().await
417                }
418                None => {
419                    debug!("Lifecycle request stream closed");
420                    std::future::pending::<()>().await
421                }
422            }
423        }
424        .fuse()
425    );
426
427    let mut outgoing_dir_task =
428        fs.until_stalled(BUILD_CONFIG.stall_interval).for_each_concurrent(None, move |item| {
429            let lifecycle_control_handle = lifecycle_control_handle.clone();
430            let state = state.clone();
431            let store = store.clone();
432            async move {
433                match item {
434                    fuchsia_component::server::Item::Request(req, _active_guard) => match req {
435                        IncomingRequest::UpdateDone(stream) => {
436                            if let Err(e) = handle_update_done(stream, state.persisted).await {
437                                error!("Failed to handle NotifierRequest: {e}");
438                            }
439                        },
440                        IncomingRequest::SampleSink(stream) => {
441                            if let Err(e) = state.scheduler.handle_sample_sink(stream).await {
442                                error!("Failed to handle SampleSinkRequest: {e}");
443                            }
444                        },
445                    },
446                    fuchsia_component::server::Item::Stalled(outgoing_directory) => {
447                        let escrowed_dictionary = match ComponentState::as_escrowed_dict(
448                            &store,
449                            state.persisted,
450                            state.inspect_controller
451                        ).await {
452                            Ok(dict) => Some(dict),
453                            Err(e) => {
454                                error!(
455                                    "Failed to serialize PersistedState into component dictionary: {e}"
456                                );
457                                None
458                            }
459                        };
460                        lifecycle_control_handle
461                            .send_on_escrow(flifecycle::LifecycleOnEscrowRequest {
462                                outgoing_dir: Some(outgoing_directory.into()),
463                                escrowed_dictionary,
464                                ..Default::default()
465                            })
466                            .unwrap();
467                    }
468                }
469            }
470        });
471
472    select! {
473        _ = lifecycle_task => {
474            info!("Stopping due to lifecycle request");
475            scope.cancel().await;
476        },
477        _ = outgoing_dir_task => {
478            info!("Stopping due to idle activity");
479            scope.join().await;
480        },
481    }
482
483    Ok(())
484}