1#![allow(clippy::let_unit_value)]
6#![allow(clippy::too_many_arguments)]
7#![allow(clippy::enum_variant_names)]
8
9use anyhow::{Context as _, Error, anyhow};
10use async_lock::RwLock as AsyncRwLock;
11use cobalt_sw_delivery_registry as metrics;
12use delivery_blob::DeliveryBlobType;
13use fdio::Namespace;
14use fidl::endpoints::DiscoverableProtocolMarker as _;
15use fidl_contrib::ProtocolConnector;
16use fidl_contrib::protocol_connector::ProtocolSender;
17use fidl_fuchsia_io as fio;
18use fidl_fuchsia_metrics as fmetrics;
19use fidl_fuchsia_pkg as fpkg;
20use fidl_fuchsia_pkg_http as fpkg_http;
21use fuchsia_async as fasync;
22use fuchsia_cobalt_builders::MetricEventExt as _;
23use fuchsia_component::server::ServiceFs;
24use fuchsia_inspect as inspect;
25use fuchsia_trace as ftrace;
26use futures::prelude::*;
27use futures::stream::FuturesUnordered;
28use log::{error, info, warn};
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31
32mod cache;
33mod cache_package_index;
34mod clock;
35mod component_resolver;
36mod config;
37mod eager_package_manager;
38mod error;
39mod inspect_util;
40mod metrics_util;
41mod ota_downloader;
42mod repository;
43mod repository_manager;
44mod repository_service;
45mod resolver_service;
46mod rewrite_manager;
47mod rewrite_service;
48mod util;
49
50#[cfg(test)]
51mod test_util;
52
53use crate::cache::BasePackageIndex;
54use crate::config::Config;
55use crate::repository_manager::{RepositoryManager, RepositoryManagerBuilder};
56use crate::repository_service::RepositoryService;
57use crate::resolver_service::ResolverServiceInspectState;
58use crate::rewrite_manager::{LoadRulesError, RewriteManager, RewriteManagerBuilder};
59use crate::rewrite_service::RewriteService;
60
61const MAX_CONCURRENT_PACKAGE_FETCHES: usize = 5;
68
69const COBALT_CONNECTOR_BUFFER_SIZE: usize = 1000;
72
73const STATIC_REPO_DIR: &str = "/config/data/repositories";
74const DYNAMIC_REPO_PATH: &str = "repositories.json";
76
77const STATIC_RULES_PATH: &str = "rewrites.json";
79const DYNAMIC_RULES_PATH: &str = "rewrites.json";
81
82#[fuchsia::main(logging_tags = ["pkg-resolver"])]
83pub fn main() -> Result<(), Error> {
84 let startup_time = Instant::now();
85 fuchsia_trace_provider::trace_provider_create_with_fdio();
86 info!("starting package resolver");
87
88 let mut executor = fasync::LocalExecutorBuilder::new().build();
89 executor.run_singlethreaded(main_inner_async(startup_time)).map_err(|err| {
90 let err = anyhow!(err);
92 error!("error running pkg-resolver: {:#}", err);
93 err
94 })
95}
96
97async fn main_inner_async(startup_time: Instant) -> Result<(), Error> {
98 let config = Config::load_from_config_data_or_default();
99 let structured_config = pkg_resolver_config::Config::take_from_startup_handle();
100
101 let pkg_cache_proxy =
102 fuchsia_component::client::connect_to_protocol::<fpkg::PackageCacheMarker>()
103 .context("error connecting to package cache")?;
104 let pkg_cache = fidl_fuchsia_pkg_ext::cache::Client::from_proxy(pkg_cache_proxy);
105
106 let base_package_index = Arc::new(
107 BasePackageIndex::from_proxy(pkg_cache.proxy())
108 .await
109 .context("failed to load base package index")?,
110 );
111
112 let system_cache_list = Arc::new(cache_package_index::from_proxy(pkg_cache.proxy()).await);
114
115 let inspector = fuchsia_inspect::Inspector::default();
116 inspector
117 .root()
118 .record_child("structured_config", |node| structured_config.record_inspect(node));
119
120 let futures = FuturesUnordered::new();
121
122 let (mut cobalt_sender, cobalt_fut) = ProtocolConnector::new_with_buffer_size(
123 metrics_util::CobaltConnectedService,
124 COBALT_CONNECTOR_BUFFER_SIZE,
125 )
126 .serve_and_log_errors();
127 futures.push(cobalt_fut.boxed_local());
128
129 let data_proxy = match fuchsia_fs::directory::open_in_namespace(
130 "/data",
131 fio::PERM_READABLE | fio::PERM_WRITABLE,
132 ) {
133 Ok(proxy) => Some(proxy),
134 Err(e) => {
135 warn!("failed to open /data: {:#}", anyhow!(e));
136 None
137 }
138 };
139
140 if data_proxy.is_some() {
141 let namespace = Namespace::installed().context("failed to get installed namespace")?;
142 namespace.unbind("/data").context("failed to unbind /data from default namespace")?;
143 }
144
145 let config_proxy =
146 match fuchsia_fs::directory::open_in_namespace("/config/data", fio::PERM_READABLE) {
147 Ok(proxy) => Some(proxy),
148 Err(e) => {
149 warn!("failed to open /config/data: {:#}", anyhow!(e));
150 None
151 }
152 };
153
154 let delivery_blob_type: DeliveryBlobType =
155 structured_config.delivery_blob_type.try_into().with_context(|| {
156 format!("invalid delivery blob type {}", structured_config.delivery_blob_type)
157 })?;
158
159 let repo_manager = Arc::new(AsyncRwLock::new(
160 load_repo_manager(
161 inspector.root().create_child("repository_manager"),
162 &config,
163 cobalt_sender.clone(),
164 TufTimeouts {
165 metadata: std::time::Duration::from_secs(
166 structured_config.tuf_metadata_timeout_seconds.into(),
167 ),
168 network_header: zx::BootDuration::from_seconds(
169 structured_config.tuf_network_header_timeout_seconds.into(),
170 ),
171 },
172 data_proxy.clone(),
173 delivery_blob_type,
174 )
175 .await,
176 ));
177 let rewrite_manager = Arc::new(AsyncRwLock::new(
178 load_rewrite_manager(
179 inspector.root().create_child("rewrite_manager"),
180 &config,
181 data_proxy.clone(),
182 config_proxy,
183 )
184 .await,
185 ));
186
187 let (blob_fetch_queue, blob_fetcher) = crate::cache::BlobFetcher::new(
188 fuchsia_component::client::connect_to_protocol::<fpkg_http::ClientMarker>()
189 .context("error connecting to fuchsia.pkg.http/Client")?,
190 inspector.root().create_child("blob_fetcher"),
191 structured_config.blob_download_concurrency_limit.into(),
192 repo_manager.read().await.stats(),
193 cache::BlobFetchParams::builder()
194 .header_network_timeout(zx::BootDuration::from_seconds(
195 structured_config.blob_network_header_timeout_seconds.into(),
196 ))
197 .body_network_timeout(zx::BootDuration::from_seconds(
198 structured_config.blob_network_body_timeout_seconds.into(),
199 ))
200 .download_resumption_attempts_limit(
201 structured_config.blob_download_resumption_attempts_limit,
202 )
203 .build(),
204 );
205 futures.push(blob_fetch_queue.boxed_local());
206
207 let resolver_service_inspect_state = Arc::new(ResolverServiceInspectState::from_node(
208 inspector.root().create_child("resolver_service"),
209 ));
210 let (package_fetch_queue, package_resolver) = resolver_service::QueuedResolver::new(
211 pkg_cache.clone(),
212 Arc::clone(&base_package_index),
213 Arc::clone(&system_cache_list),
214 Arc::clone(&repo_manager),
215 Arc::clone(&rewrite_manager),
216 blob_fetcher.clone(),
217 MAX_CONCURRENT_PACKAGE_FETCHES,
218 Arc::clone(&resolver_service_inspect_state),
219 );
220 futures.push(package_fetch_queue.boxed_local());
221
222 let eager_package_manager = Arc::new(
227 crate::eager_package_manager::EagerPackageManager::from_namespace(
228 package_resolver.clone(),
229 pkg_cache.clone(),
230 data_proxy,
231 &system_cache_list,
232 cobalt_sender.clone(),
233 )
234 .await
235 .map_err(|e| {
236 error!("failed to create EagerPackageManager: {:#}", &e);
237 })
238 .ok()
239 .map(AsyncRwLock::new),
240 );
241
242 let make_resolver_cb = {
243 let repo_manager = Arc::clone(&repo_manager);
244 let rewrite_manager = Arc::clone(&rewrite_manager);
245 let package_resolver = package_resolver.clone();
246 let pkg_cache = pkg_cache.clone();
247 let cobalt_sender = cobalt_sender.clone();
248 let eager_package_manager = Arc::clone(&eager_package_manager);
249 move |gc_protection| {
250 let repo_manager = Arc::clone(&repo_manager);
251 let rewrite_manager = Arc::clone(&rewrite_manager);
252 let package_resolver = package_resolver.clone();
253 let pkg_cache = pkg_cache.clone();
254 let base_package_index = Arc::clone(&base_package_index);
255 let system_cache_list = Arc::clone(&system_cache_list);
256 let cobalt_sender = cobalt_sender.clone();
257 let resolver_service_inspect_state = Arc::clone(&resolver_service_inspect_state);
258 let eager_package_manager = Arc::clone(&eager_package_manager);
259 move |stream| {
260 fasync::Task::local(
261 resolver_service::run_resolver_service(
262 Arc::clone(&repo_manager),
263 Arc::clone(&rewrite_manager),
264 package_resolver.clone(),
265 pkg_cache.clone(),
266 Arc::clone(&base_package_index),
267 Arc::clone(&system_cache_list),
268 stream,
269 gc_protection,
270 cobalt_sender.clone(),
271 Arc::clone(&resolver_service_inspect_state),
272 Arc::clone(&eager_package_manager),
273 )
274 .unwrap_or_else(|e| error!("run_resolver_service failed: {:#}", anyhow!(e))),
275 )
276 .detach()
277 }
278 }
279 };
280
281 let resolver_toolbox_cb = {
282 let package_resolver = package_resolver.clone();
283 let cobalt_sender = cobalt_sender.clone();
284 let eager_package_manager = Arc::clone(&eager_package_manager);
285 move |stream| {
286 fasync::Task::local(
287 resolver_service::run_resolver_toolbox_service(
288 package_resolver.clone(),
289 stream,
290 fpkg::GcProtection::OpenPackageTracking,
291 cobalt_sender.clone(),
292 Arc::clone(&eager_package_manager),
293 )
294 .unwrap_or_else(|e| {
295 error!("run_resolver_toolbox_service failed: {:#}", anyhow!(e))
296 }),
297 )
298 .detach()
299 }
300 };
301
302 let repo_cb = move |stream| {
303 let repo_manager = Arc::clone(&repo_manager);
304
305 fasync::Task::local(
306 async move {
307 let mut repo_service = RepositoryService::new(repo_manager);
308 repo_service.run(stream).await
309 }
310 .unwrap_or_else(|e| error!("error encountered: {:#}", anyhow!(e))),
311 )
312 .detach()
313 };
314
315 let rewrite_cb = move |stream| {
316 let mut rewrite_service = RewriteService::new(Arc::clone(&rewrite_manager));
317
318 fasync::Task::local(
319 async move { rewrite_service.handle_client(stream).await }
320 .unwrap_or_else(|e| error!("while handling rewrite client {:#}", anyhow!(e))),
321 )
322 .detach()
323 };
324
325 let cup_cb = {
326 let cobalt_sender = cobalt_sender.clone();
327 move |stream| {
328 fasync::Task::local(
329 eager_package_manager::run_cup_service(
330 Arc::clone(&eager_package_manager),
331 stream,
332 cobalt_sender.clone(),
333 )
334 .unwrap_or_else(|e| error!("run_cup_service failed: {:#}", anyhow!(e))),
335 )
336 .detach()
337 }
338 };
339
340 let component_resolver_cb = move |stream| {
341 fasync::Task::local(
342 component_resolver::serve(stream)
343 .unwrap_or_else(|e| error!("serve_component_resolver_failed: {:#}", e)),
344 )
345 .detach()
346 };
347
348 let ota_downloader_cb = move |stream| {
349 fasync::Task::local(
350 ota_downloader::serve(stream, blob_fetcher.clone(), pkg_cache.clone())
351 .unwrap_or_else(|e| error!("run_cup_service failed: {:#}", anyhow!(e))),
352 )
353 .detach()
354 };
355
356 let mut fs = ServiceFs::new();
357 fs.dir("svc")
358 .add_fidl_service(make_resolver_cb(fpkg::GcProtection::OpenPackageTracking))
359 .add_fidl_service_at(
360 format!("{}-ota", fpkg::PackageResolverMarker::PROTOCOL_NAME),
361 make_resolver_cb(fpkg::GcProtection::Retained),
362 )
363 .add_fidl_service(resolver_toolbox_cb)
364 .add_fidl_service(repo_cb)
365 .add_fidl_service(rewrite_cb)
366 .add_fidl_service(cup_cb)
367 .add_fidl_service(component_resolver_cb)
368 .add_fidl_service(ota_downloader_cb);
369
370 fs.take_and_serve_directory_handle().context("while serving directory handle")?;
371
372 futures.push(fs.collect().boxed_local());
373
374 cobalt_sender.send(
375 fmetrics::MetricEvent::builder(metrics::PKG_RESOLVER_STARTUP_DURATION_MIGRATED_METRIC_ID)
376 .as_integer(Instant::now().duration_since(startup_time).as_micros() as i64),
377 );
378
379 ftrace::instant!("app", "startup", ftrace::Scope::Process);
380
381 let _inspect_server_task =
382 inspect_runtime::publish(&inspector, inspect_runtime::PublishOptions::default());
383
384 futures.collect::<()>().await;
385
386 Ok(())
387}
388
389async fn load_repo_manager(
390 node: inspect::Node,
391 config: &Config,
392 mut cobalt_sender: ProtocolSender<fmetrics::MetricEvent>,
393 tuf_timeouts: TufTimeouts,
394 data_proxy: Option<fio::DirectoryProxy>,
395 delivery_blob_type: DeliveryBlobType,
396) -> RepositoryManager {
397 let dynamic_repo_path =
400 if config.enable_dynamic_configuration() { Some(DYNAMIC_REPO_PATH) } else { None };
401 let builder = match RepositoryManagerBuilder::new(data_proxy, dynamic_repo_path, tuf_timeouts)
402 .await
403 .unwrap_or_else(|(builder, err)| {
404 error!("error loading dynamic repo config: {:#}", anyhow!(err));
405 builder
406 })
407 .delivery_blob_type(delivery_blob_type)
408 .inspect_node(node)
409 .load_static_configs_dir(STATIC_REPO_DIR)
410 {
411 Ok(builder) => {
412 cobalt_sender.send(
413 fmetrics::MetricEvent::builder(
414 metrics::REPOSITORY_MANAGER_LOAD_STATIC_CONFIGS_MIGRATED_METRIC_ID,
415 )
416 .with_event_codes(
417 metrics::RepositoryManagerLoadStaticConfigsMigratedMetricDimensionResult::Success,
418 )
419 .as_occurrence(1),
420 );
421 builder
422 }
423 Err((builder, errs)) => {
424 for err in errs {
425 let dimension_result: metrics::RepositoryManagerLoadStaticConfigsMigratedMetricDimensionResult
426 = (&err).into();
427 cobalt_sender.send(
428 fmetrics::MetricEvent::builder(
429 metrics::REPOSITORY_MANAGER_LOAD_STATIC_CONFIGS_MIGRATED_METRIC_ID,
430 )
431 .with_event_codes(dimension_result)
432 .as_occurrence(1),
433 );
434 match &err {
435 crate::repository_manager::LoadError::Io { path: _, error }
436 if error.kind() == std::io::ErrorKind::NotFound =>
437 {
438 info!("no statically configured repositories present");
439 }
440 _ => error!("error loading static repo config: {:#}", anyhow!(err)),
441 };
442 }
443 builder
444 }
445 };
446
447 match config.persisted_repos_dir() {
448 Some(repo) => builder.with_persisted_repos_dir(repo),
449 None => builder,
450 }
451 .cobalt_sender(cobalt_sender)
452 .build()
453}
454
455async fn load_rewrite_manager(
456 node: inspect::Node,
457 config: &Config,
458 data_proxy: Option<fio::DirectoryProxy>,
459 config_proxy: Option<fio::DirectoryProxy>,
460) -> RewriteManager {
461 let dynamic_rules_path =
462 if config.enable_dynamic_configuration() { Some(DYNAMIC_RULES_PATH) } else { None };
463 let builder = RewriteManagerBuilder::new(data_proxy, dynamic_rules_path)
464 .await
465 .unwrap_or_else(|(builder, err)| {
466 match err {
467 LoadRulesError::FileOpen(fuchsia_fs::node::OpenError::OpenError(
469 zx::Status::NOT_FOUND,
470 )) => {}
471 LoadRulesError::DirOpen(_) => {}
473 err => error!(
474 "unable to load dynamic rewrite rules from disk, using defaults: {:#}",
475 anyhow!(err)
476 ),
477 };
478 builder
479 })
480 .inspect_node(node)
481 .static_rules_path(config_proxy, STATIC_RULES_PATH)
482 .await
483 .unwrap_or_else(|(builder, err)| {
484 match err {
485 LoadRulesError::FileOpen(fuchsia_fs::node::OpenError::OpenError(
487 zx::Status::NOT_FOUND,
488 )) => {}
489 LoadRulesError::DirOpen(_) => {}
491 err => {
492 error!("unable to load static rewrite rules from disk: {:#}", anyhow!(err))
493 }
494 };
495 builder
496 });
497
498 builder.build()
499}
500
501#[derive(Clone, Copy, Debug)]
502struct TufTimeouts {
503 metadata: Duration,
504 network_header: zx::BootDuration,
505}