persistence/
lib.rs
1mod constants;
8mod fetcher;
9mod file_handler;
10mod inspect_server;
11mod persist_server;
12mod scheduler;
13
14use anyhow::{bail, format_err, Context, Error};
15use argh::FromArgs;
16use fetcher::Fetcher;
17use fidl::endpoints;
18use fuchsia_component::client;
19use fuchsia_component::server::ServiceFs;
20use fuchsia_inspect::component;
21use fuchsia_inspect::health::Reporter;
22use futures::{StreamExt, TryStreamExt};
23use log::*;
24use persist_server::PersistServer;
25use persistence_config::Config;
26use scheduler::Scheduler;
27use zx::BootInstant;
28use {fidl_fuchsia_component_sandbox as fsandbox, fuchsia_async as fasync};
29
30pub const PROGRAM_NAME: &str = "persistence";
32pub const PERSIST_NODE_NAME: &str = "persist";
33pub const PUBLISHED_TIME_KEY: &str = "published";
35
36#[derive(FromArgs, Debug, PartialEq)]
38#[argh(subcommand, name = "persistence")]
39pub struct CommandLine {}
40
41macro_rules! on_error {
44 ($value:expr, $error_message:expr) => {
45 $value.or_else(|e| {
46 let message = format!($error_message, e);
47 warn!("{}", message);
48 bail!("{}", message)
49 })
50 };
51}
52
53pub async fn main(_args: CommandLine) -> Result<(), Error> {
54 info!("Starting Diagnostics Persistence Service service");
55 let mut health = component::health();
56 let config =
57 on_error!(persistence_config::load_configuration_files(), "Error loading configs: {}")?;
58 let inspector = component::inspector();
59 let _inspect_server_task =
60 inspect_runtime::publish(inspector, inspect_runtime::PublishOptions::default());
61
62 file_handler::forget_old_data(&config);
63
64 let (fetch_requester, _fetcher_task) =
66 on_error!(Fetcher::new(&config), "Error initializing fetcher: {}")?;
67
68 let scheduler = Scheduler::new(fetch_requester, &config);
69
70 let scope = fasync::Scope::new();
72 let services_scope = scope.new_child_with_name("services");
73
74 let _service_scopes = spawn_persist_services(&config, scheduler, &services_scope)
75 .await
76 .expect("Error spawning persist services");
77
78 scope.spawn(async move {
83 info!("Waiting for post-boot update check...");
84 match fuchsia_component::client::connect_to_protocol::<fidl_fuchsia_update::ListenerMarker>(
85 ) {
86 Ok(proxy) => match proxy.wait_for_first_update_check_to_complete().await {
87 Ok(()) => {}
88 Err(e) => {
89 warn!(e:?; "Error waiting for first update check; not publishing.");
90 return;
91 }
92 },
93 Err(e) => {
94 warn!(
95 e:?;
96 "Unable to connect to fuchsia.update.Listener; will publish immediately."
97 );
98 }
99 }
100 info!("...Update check has completed; publishing previous boot data");
102 inspector.root().record_child(PERSIST_NODE_NAME, |node| {
103 inspect_server::serve_persisted_data(node);
104 health.set_ok();
105 info!("Diagnostics Persistence Service ready");
106 });
107 inspector.root().record_int(PUBLISHED_TIME_KEY, BootInstant::get().into_nanos());
108 });
109
110 scope.await;
111
112 Ok(())
113}
114
115enum IncomingRequest {
116 Router(fsandbox::DictionaryRouterRequestStream),
117}
118
119async fn spawn_persist_services(
122 config: &Config,
123 scheduler: Scheduler,
124 scope: &fasync::Scope,
125) -> Result<Vec<fasync::Scope>, Error> {
126 let store = client::connect_to_protocol::<fsandbox::CapabilityStoreMarker>().unwrap();
127 let id_gen = sandbox::CapabilityIdGenerator::new();
128
129 let services_dict = id_gen.next();
130 store
131 .dictionary_create(services_dict)
132 .await
133 .context("Failed to send FIDL to create dictionary")?
134 .map_err(|e| format_err!("Failed to create dictionary: {e:?}"))?;
135
136 let mut service_scopes = Vec::with_capacity(config.len());
138
139 for (service_name, tags) in config {
140 let connector_id = id_gen.next();
141 let (receiver, receiver_stream) =
142 endpoints::create_request_stream::<fsandbox::ReceiverMarker>();
143
144 store
145 .connector_create(connector_id, receiver)
146 .await
147 .context("Failed to send FIDL to create connector")?
148 .map_err(|e| format_err!("Failed to create connector: {e:?}"))?;
149
150 store
151 .dictionary_insert(
152 services_dict,
153 &fsandbox::DictionaryItem {
154 key: format!("{}-{}", constants::PERSIST_SERVICE_NAME_PREFIX, service_name),
155 value: connector_id,
156 },
157 )
158 .await
159 .context(
160 "Failed to send FIDL to insert into diagnostics-persist-capabilities dictionary",
161 )?
162 .map_err(|e| {
163 format_err!(
164 "Failed to insert into diagnostics-persist-capabilities dictionary: {e:?}"
165 )
166 })?;
167
168 let service_scope = scope.new_child_with_name(service_name);
169 PersistServer::spawn(
170 service_name.clone(),
171 tags.keys().cloned().collect(),
172 scheduler.clone(),
173 &service_scope,
174 receiver_stream,
175 );
176 service_scopes.push(service_scope);
177 }
178
179 let mut fs = ServiceFs::new();
181 fs.dir("svc").add_fidl_service(IncomingRequest::Router);
182 fs.take_and_serve_directory_handle().expect("Failed to take service directory handle");
183 scope.spawn(fs.for_each_concurrent(None, move |IncomingRequest::Router(mut stream)| {
184 let store = store.clone();
185 let id_gen = id_gen.clone();
186 async move {
187 while let Ok(Some(request)) = stream.try_next().await {
188 match request {
189 fsandbox::DictionaryRouterRequest::Route { payload: _, responder } => {
190 let dup_dict_id = id_gen.next();
191 store.duplicate(services_dict, dup_dict_id).await.unwrap().unwrap();
192 let capability = store.export(dup_dict_id).await.unwrap().unwrap();
193 let fsandbox::Capability::Dictionary(dict) = capability else {
194 panic!("capability was not a dictionary? {capability:?}");
195 };
196 let _ = responder
197 .send(Ok(fsandbox::DictionaryRouterRouteResponse::Dictionary(dict)));
198 }
199 fsandbox::DictionaryRouterRequest::_UnknownMethod { ordinal, .. } => {
200 warn!(ordinal:%; "Unknown DictionaryRouter request");
201 }
202 }
203 }
204 }
205 }));
206
207 Ok(service_scopes)
208}