1#![allow(clippy::let_unit_value)]
6#![allow(clippy::too_many_arguments)]
7#![allow(clippy::enum_variant_names)]
8
9use anyhow::{Context as _, Error, anyhow};
10use async_lock::RwLock as AsyncRwLock;
11use delivery_blob::DeliveryBlobType;
12use fdio::Namespace;
13use fidl::endpoints::DiscoverableProtocolMarker as _;
14use fidl_contrib::ProtocolConnector;
15use fidl_contrib::protocol_connector::ProtocolSender;
16use fuchsia_cobalt_builders::MetricEventExt as _;
17use fuchsia_component::server::ServiceFs;
18use futures::prelude::*;
19use futures::stream::FuturesUnordered;
20use log::{error, info, warn};
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use {
24 cobalt_sw_delivery_registry as metrics, fidl_fuchsia_io as fio,
25 fidl_fuchsia_metrics as fmetrics, fidl_fuchsia_pkg as fpkg, fidl_fuchsia_pkg_http as fpkg_http,
26 fuchsia_async as fasync, fuchsia_inspect as inspect, fuchsia_trace as ftrace,
27};
28
29mod cache;
30mod cache_package_index;
31mod clock;
32mod component_resolver;
33mod config;
34mod eager_package_manager;
35mod error;
36mod inspect_util;
37mod metrics_util;
38mod ota_downloader;
39mod repository;
40mod repository_manager;
41mod repository_service;
42mod resolver_service;
43mod rewrite_manager;
44mod rewrite_service;
45mod util;
46
47#[cfg(test)]
48mod test_util;
49
50use crate::cache::BasePackageIndex;
51use crate::config::Config;
52use crate::repository_manager::{RepositoryManager, RepositoryManagerBuilder};
53use crate::repository_service::RepositoryService;
54use crate::resolver_service::ResolverServiceInspectState;
55use crate::rewrite_manager::{LoadRulesError, RewriteManager, RewriteManagerBuilder};
56use crate::rewrite_service::RewriteService;
57
58const MAX_CONCURRENT_PACKAGE_FETCHES: usize = 5;
65
66const COBALT_CONNECTOR_BUFFER_SIZE: usize = 1000;
69
70const STATIC_REPO_DIR: &str = "/config/data/repositories";
71const DYNAMIC_REPO_PATH: &str = "repositories.json";
73
74const STATIC_RULES_PATH: &str = "rewrites.json";
76const DYNAMIC_RULES_PATH: &str = "rewrites.json";
78
79const TCP_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(30);
84
85#[fuchsia::main(logging_tags = ["pkg-resolver"])]
86pub fn main() -> Result<(), Error> {
87 let startup_time = Instant::now();
88 fuchsia_trace_provider::trace_provider_create_with_fdio();
89 info!("starting package resolver");
90
91 let mut executor = fasync::LocalExecutorBuilder::new().build();
92 executor.run_singlethreaded(main_inner_async(startup_time)).map_err(|err| {
93 let err = anyhow!(err);
95 error!("error running pkg-resolver: {:#}", err);
96 err
97 })
98}
99
100async fn main_inner_async(startup_time: Instant) -> Result<(), Error> {
101 let config = Config::load_from_config_data_or_default();
102 let structured_config = pkg_resolver_config::Config::take_from_startup_handle();
103
104 let pkg_cache_proxy =
105 fuchsia_component::client::connect_to_protocol::<fpkg::PackageCacheMarker>()
106 .context("error connecting to package cache")?;
107 let pkg_cache = fidl_fuchsia_pkg_ext::cache::Client::from_proxy(pkg_cache_proxy);
108
109 let base_package_index = Arc::new(
110 BasePackageIndex::from_proxy(pkg_cache.proxy())
111 .await
112 .context("failed to load base package index")?,
113 );
114
115 let system_cache_list = Arc::new(cache_package_index::from_proxy(pkg_cache.proxy()).await);
117
118 let inspector = fuchsia_inspect::Inspector::default();
119 inspector
120 .root()
121 .record_child("structured_config", |node| structured_config.record_inspect(node));
122
123 let futures = FuturesUnordered::new();
124
125 let (mut cobalt_sender, cobalt_fut) = ProtocolConnector::new_with_buffer_size(
126 metrics_util::CobaltConnectedService,
127 COBALT_CONNECTOR_BUFFER_SIZE,
128 )
129 .serve_and_log_errors();
130 futures.push(cobalt_fut.boxed_local());
131
132 let data_proxy = match fuchsia_fs::directory::open_in_namespace(
133 "/data",
134 fio::PERM_READABLE | fio::PERM_WRITABLE,
135 ) {
136 Ok(proxy) => Some(proxy),
137 Err(e) => {
138 warn!("failed to open /data: {:#}", anyhow!(e));
139 None
140 }
141 };
142
143 if data_proxy.is_some() {
144 let namespace = Namespace::installed().context("failed to get installed namespace")?;
145 namespace.unbind("/data").context("failed to unbind /data from default namespace")?;
146 }
147
148 let config_proxy =
149 match fuchsia_fs::directory::open_in_namespace("/config/data", fio::PERM_READABLE) {
150 Ok(proxy) => Some(proxy),
151 Err(e) => {
152 warn!("failed to open /config/data: {:#}", anyhow!(e));
153 None
154 }
155 };
156
157 let delivery_blob_type: DeliveryBlobType =
158 structured_config.delivery_blob_type.try_into().with_context(|| {
159 format!("invalid delivery blob type {}", structured_config.delivery_blob_type)
160 })?;
161
162 let repo_manager = Arc::new(AsyncRwLock::new(
163 load_repo_manager(
164 inspector.root().create_child("repository_manager"),
165 &config,
166 cobalt_sender.clone(),
167 std::time::Duration::from_secs(structured_config.tuf_metadata_timeout_seconds.into()),
168 data_proxy.clone(),
169 delivery_blob_type,
170 )
171 .await,
172 ));
173 let rewrite_manager = Arc::new(AsyncRwLock::new(
174 load_rewrite_manager(
175 inspector.root().create_child("rewrite_manager"),
176 &config,
177 data_proxy.clone(),
178 config_proxy,
179 )
180 .await,
181 ));
182
183 let (blob_fetch_queue, blob_fetcher) = crate::cache::BlobFetcher::new(
184 fuchsia_component::client::connect_to_protocol::<fpkg_http::ClientMarker>()
185 .context("error connecting to fuchsia.pkg.http/Client")?,
186 inspector.root().create_child("blob_fetcher"),
187 structured_config.blob_download_concurrency_limit.into(),
188 repo_manager.read().await.stats(),
189 cache::BlobFetchParams::builder()
190 .header_network_timeout(zx::BootDuration::from_seconds(
191 structured_config.blob_network_header_timeout_seconds.into(),
192 ))
193 .body_network_timeout(zx::BootDuration::from_seconds(
194 structured_config.blob_network_body_timeout_seconds.into(),
195 ))
196 .download_resumption_attempts_limit(
197 structured_config.blob_download_resumption_attempts_limit,
198 )
199 .build(),
200 );
201 futures.push(blob_fetch_queue.boxed_local());
202
203 let resolver_service_inspect_state = Arc::new(ResolverServiceInspectState::from_node(
204 inspector.root().create_child("resolver_service"),
205 ));
206 let (package_fetch_queue, package_resolver) = resolver_service::QueuedResolver::new(
207 pkg_cache.clone(),
208 Arc::clone(&base_package_index),
209 Arc::clone(&system_cache_list),
210 Arc::clone(&repo_manager),
211 Arc::clone(&rewrite_manager),
212 blob_fetcher.clone(),
213 MAX_CONCURRENT_PACKAGE_FETCHES,
214 Arc::clone(&resolver_service_inspect_state),
215 );
216 futures.push(package_fetch_queue.boxed_local());
217
218 let eager_package_manager = Arc::new(
223 crate::eager_package_manager::EagerPackageManager::from_namespace(
224 package_resolver.clone(),
225 pkg_cache.clone(),
226 data_proxy,
227 &system_cache_list,
228 cobalt_sender.clone(),
229 )
230 .await
231 .map_err(|e| {
232 error!("failed to create EagerPackageManager: {:#}", &e);
233 })
234 .ok()
235 .map(AsyncRwLock::new),
236 );
237
238 let make_resolver_cb = {
239 let repo_manager = Arc::clone(&repo_manager);
240 let rewrite_manager = Arc::clone(&rewrite_manager);
241 let package_resolver = package_resolver.clone();
242 let pkg_cache = pkg_cache.clone();
243 let cobalt_sender = cobalt_sender.clone();
244 let eager_package_manager = Arc::clone(&eager_package_manager);
245 move |gc_protection| {
246 let repo_manager = Arc::clone(&repo_manager);
247 let rewrite_manager = Arc::clone(&rewrite_manager);
248 let package_resolver = package_resolver.clone();
249 let pkg_cache = pkg_cache.clone();
250 let base_package_index = Arc::clone(&base_package_index);
251 let system_cache_list = Arc::clone(&system_cache_list);
252 let cobalt_sender = cobalt_sender.clone();
253 let resolver_service_inspect_state = Arc::clone(&resolver_service_inspect_state);
254 let eager_package_manager = Arc::clone(&eager_package_manager);
255 move |stream| {
256 fasync::Task::local(
257 resolver_service::run_resolver_service(
258 Arc::clone(&repo_manager),
259 Arc::clone(&rewrite_manager),
260 package_resolver.clone(),
261 pkg_cache.clone(),
262 Arc::clone(&base_package_index),
263 Arc::clone(&system_cache_list),
264 stream,
265 gc_protection,
266 cobalt_sender.clone(),
267 Arc::clone(&resolver_service_inspect_state),
268 Arc::clone(&eager_package_manager),
269 )
270 .unwrap_or_else(|e| error!("run_resolver_service failed: {:#}", anyhow!(e))),
271 )
272 .detach()
273 }
274 }
275 };
276
277 let resolver_toolbox_cb = {
278 let package_resolver = package_resolver.clone();
279 let cobalt_sender = cobalt_sender.clone();
280 let eager_package_manager = Arc::clone(&eager_package_manager);
281 move |stream| {
282 fasync::Task::local(
283 resolver_service::run_resolver_toolbox_service(
284 package_resolver.clone(),
285 stream,
286 fpkg::GcProtection::OpenPackageTracking,
287 cobalt_sender.clone(),
288 Arc::clone(&eager_package_manager),
289 )
290 .unwrap_or_else(|e| {
291 error!("run_resolver_toolbox_service failed: {:#}", anyhow!(e))
292 }),
293 )
294 .detach()
295 }
296 };
297
298 let repo_cb = move |stream| {
299 let repo_manager = Arc::clone(&repo_manager);
300
301 fasync::Task::local(
302 async move {
303 let mut repo_service = RepositoryService::new(repo_manager);
304 repo_service.run(stream).await
305 }
306 .unwrap_or_else(|e| error!("error encountered: {:#}", anyhow!(e))),
307 )
308 .detach()
309 };
310
311 let rewrite_cb = move |stream| {
312 let mut rewrite_service = RewriteService::new(Arc::clone(&rewrite_manager));
313
314 fasync::Task::local(
315 async move { rewrite_service.handle_client(stream).await }
316 .unwrap_or_else(|e| error!("while handling rewrite client {:#}", anyhow!(e))),
317 )
318 .detach()
319 };
320
321 let cup_cb = {
322 let cobalt_sender = cobalt_sender.clone();
323 move |stream| {
324 fasync::Task::local(
325 eager_package_manager::run_cup_service(
326 Arc::clone(&eager_package_manager),
327 stream,
328 cobalt_sender.clone(),
329 )
330 .unwrap_or_else(|e| error!("run_cup_service failed: {:#}", anyhow!(e))),
331 )
332 .detach()
333 }
334 };
335
336 let component_resolver_cb = move |stream| {
337 fasync::Task::local(
338 component_resolver::serve(stream)
339 .unwrap_or_else(|e| error!("serve_component_resolver_failed: {:#}", e)),
340 )
341 .detach()
342 };
343
344 let ota_downloader_cb = move |stream| {
345 fasync::Task::local(
346 ota_downloader::serve(stream, blob_fetcher.clone(), pkg_cache.clone())
347 .unwrap_or_else(|e| error!("run_cup_service failed: {:#}", anyhow!(e))),
348 )
349 .detach()
350 };
351
352 let mut fs = ServiceFs::new();
353 fs.dir("svc")
354 .add_fidl_service(make_resolver_cb(fpkg::GcProtection::OpenPackageTracking))
355 .add_fidl_service_at(
356 format!("{}-ota", fpkg::PackageResolverMarker::PROTOCOL_NAME),
357 make_resolver_cb(fpkg::GcProtection::Retained),
358 )
359 .add_fidl_service(resolver_toolbox_cb)
360 .add_fidl_service(repo_cb)
361 .add_fidl_service(rewrite_cb)
362 .add_fidl_service(cup_cb)
363 .add_fidl_service(component_resolver_cb)
364 .add_fidl_service(ota_downloader_cb);
365
366 fs.take_and_serve_directory_handle().context("while serving directory handle")?;
367
368 futures.push(fs.collect().boxed_local());
369
370 cobalt_sender.send(
371 fmetrics::MetricEvent::builder(metrics::PKG_RESOLVER_STARTUP_DURATION_MIGRATED_METRIC_ID)
372 .as_integer(Instant::now().duration_since(startup_time).as_micros() as i64),
373 );
374
375 ftrace::instant!("app", "startup", ftrace::Scope::Process);
376
377 let _inspect_server_task =
378 inspect_runtime::publish(&inspector, inspect_runtime::PublishOptions::default());
379
380 futures.collect::<()>().await;
381
382 Ok(())
383}
384
385async fn load_repo_manager(
386 node: inspect::Node,
387 config: &Config,
388 mut cobalt_sender: ProtocolSender<fmetrics::MetricEvent>,
389 tuf_metadata_timeout: Duration,
390 data_proxy: Option<fio::DirectoryProxy>,
391 delivery_blob_type: DeliveryBlobType,
392) -> RepositoryManager {
393 let dynamic_repo_path =
396 if config.enable_dynamic_configuration() { Some(DYNAMIC_REPO_PATH) } else { None };
397 let builder = match RepositoryManagerBuilder::new(
398 data_proxy,
399 dynamic_repo_path,
400 tuf_metadata_timeout,
401 )
402 .await
403 .unwrap_or_else(|(builder, err)| {
404 error!("error loading dynamic repo config: {:#}", anyhow!(err));
405 builder
406 })
407 .delivery_blob_type(delivery_blob_type)
408 .inspect_node(node)
409 .load_static_configs_dir(STATIC_REPO_DIR)
410 {
411 Ok(builder) => {
412 cobalt_sender.send(
413 fmetrics::MetricEvent::builder(
414 metrics::REPOSITORY_MANAGER_LOAD_STATIC_CONFIGS_MIGRATED_METRIC_ID,
415 )
416 .with_event_codes(
417 metrics::RepositoryManagerLoadStaticConfigsMigratedMetricDimensionResult::Success,
418 )
419 .as_occurrence(1),
420 );
421 builder
422 }
423 Err((builder, errs)) => {
424 for err in errs {
425 let dimension_result: metrics::RepositoryManagerLoadStaticConfigsMigratedMetricDimensionResult
426 = (&err).into();
427 cobalt_sender.send(
428 fmetrics::MetricEvent::builder(
429 metrics::REPOSITORY_MANAGER_LOAD_STATIC_CONFIGS_MIGRATED_METRIC_ID,
430 )
431 .with_event_codes(dimension_result)
432 .as_occurrence(1),
433 );
434 match &err {
435 crate::repository_manager::LoadError::Io { path: _, error }
436 if error.kind() == std::io::ErrorKind::NotFound =>
437 {
438 info!("no statically configured repositories present");
439 }
440 _ => error!("error loading static repo config: {:#}", anyhow!(err)),
441 };
442 }
443 builder
444 }
445 };
446
447 match config.persisted_repos_dir() {
448 Some(repo) => builder.with_persisted_repos_dir(repo),
449 None => builder,
450 }
451 .cobalt_sender(cobalt_sender)
452 .build()
453}
454
455async fn load_rewrite_manager(
456 node: inspect::Node,
457 config: &Config,
458 data_proxy: Option<fio::DirectoryProxy>,
459 config_proxy: Option<fio::DirectoryProxy>,
460) -> RewriteManager {
461 let dynamic_rules_path =
462 if config.enable_dynamic_configuration() { Some(DYNAMIC_RULES_PATH) } else { None };
463 let builder = RewriteManagerBuilder::new(data_proxy, dynamic_rules_path)
464 .await
465 .unwrap_or_else(|(builder, err)| {
466 match err {
467 LoadRulesError::FileOpen(fuchsia_fs::node::OpenError::OpenError(
469 zx::Status::NOT_FOUND,
470 )) => {}
471 LoadRulesError::DirOpen(_) => {}
473 err => error!(
474 "unable to load dynamic rewrite rules from disk, using defaults: {:#}",
475 anyhow!(err)
476 ),
477 };
478 builder
479 })
480 .inspect_node(node)
481 .static_rules_path(config_proxy, STATIC_RULES_PATH)
482 .await
483 .unwrap_or_else(|(builder, err)| {
484 match err {
485 LoadRulesError::FileOpen(fuchsia_fs::node::OpenError::OpenError(
487 zx::Status::NOT_FOUND,
488 )) => {}
489 LoadRulesError::DirOpen(_) => {}
491 err => {
492 error!("unable to load static rewrite rules from disk: {:#}", anyhow!(err))
493 }
494 };
495 builder
496 });
497
498 builder.build()
499}