1#![allow(clippy::let_unit_value)]
6#![allow(clippy::too_many_arguments)]
7#![allow(clippy::enum_variant_names)]
8
9use anyhow::{Context as _, Error, anyhow};
10use async_lock::RwLock as AsyncRwLock;
11use delivery_blob::DeliveryBlobType;
12use fdio::Namespace;
13use fidl::endpoints::DiscoverableProtocolMarker as _;
14use fidl_contrib::ProtocolConnector;
15use fidl_contrib::protocol_connector::ProtocolSender;
16use fuchsia_cobalt_builders::MetricEventExt as _;
17use fuchsia_component::server::ServiceFs;
18use futures::prelude::*;
19use futures::stream::FuturesUnordered;
20use log::{error, info, warn};
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use {
24 cobalt_sw_delivery_registry as metrics, fidl_fuchsia_io as fio,
25 fidl_fuchsia_metrics as fmetrics, fidl_fuchsia_pkg as fpkg, fidl_fuchsia_pkg_http as fpkg_http,
26 fuchsia_async as fasync, fuchsia_inspect as inspect, fuchsia_trace as ftrace,
27};
28
29mod cache;
30mod cache_package_index;
31mod clock;
32mod component_resolver;
33mod config;
34mod eager_package_manager;
35mod error;
36mod inspect_util;
37mod metrics_util;
38mod ota_downloader;
39mod repository;
40mod repository_manager;
41mod repository_service;
42mod resolver_service;
43mod rewrite_manager;
44mod rewrite_service;
45mod util;
46
47#[cfg(test)]
48mod test_util;
49
50use crate::cache::BasePackageIndex;
51use crate::config::Config;
52use crate::repository_manager::{RepositoryManager, RepositoryManagerBuilder};
53use crate::repository_service::RepositoryService;
54use crate::resolver_service::ResolverServiceInspectState;
55use crate::rewrite_manager::{LoadRulesError, RewriteManager, RewriteManagerBuilder};
56use crate::rewrite_service::RewriteService;
57
58const MAX_CONCURRENT_PACKAGE_FETCHES: usize = 5;
65
66const COBALT_CONNECTOR_BUFFER_SIZE: usize = 1000;
69
70const STATIC_REPO_DIR: &str = "/config/data/repositories";
71const DYNAMIC_REPO_PATH: &str = "repositories.json";
73
74const STATIC_RULES_PATH: &str = "rewrites.json";
76const DYNAMIC_RULES_PATH: &str = "rewrites.json";
78
79const TCP_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(30);
84
85#[fuchsia::main(logging_tags = ["pkg-resolver"])]
86pub fn main() -> Result<(), Error> {
87 let startup_time = Instant::now();
88 fuchsia_trace_provider::trace_provider_create_with_fdio();
89 info!("starting package resolver");
90
91 let mut executor = fasync::LocalExecutorBuilder::new().build();
92 executor.run_singlethreaded(main_inner_async(startup_time)).map_err(|err| {
93 let err = anyhow!(err);
95 error!("error running pkg-resolver: {:#}", err);
96 err
97 })
98}
99
100async fn main_inner_async(startup_time: Instant) -> Result<(), Error> {
101 let config = Config::load_from_config_data_or_default();
102 let structured_config = pkg_resolver_config::Config::take_from_startup_handle();
103
104 let pkg_cache_proxy =
105 fuchsia_component::client::connect_to_protocol::<fpkg::PackageCacheMarker>()
106 .context("error connecting to package cache")?;
107 let pkg_cache = fidl_fuchsia_pkg_ext::cache::Client::from_proxy(pkg_cache_proxy);
108
109 let base_package_index = Arc::new(
110 BasePackageIndex::from_proxy(pkg_cache.proxy())
111 .await
112 .context("failed to load base package index")?,
113 );
114
115 let system_cache_list = Arc::new(cache_package_index::from_proxy(pkg_cache.proxy()).await);
117
118 let inspector = fuchsia_inspect::Inspector::default();
119 inspector
120 .root()
121 .record_child("structured_config", |node| structured_config.record_inspect(node));
122
123 let futures = FuturesUnordered::new();
124
125 let (mut cobalt_sender, cobalt_fut) = ProtocolConnector::new_with_buffer_size(
126 metrics_util::CobaltConnectedService,
127 COBALT_CONNECTOR_BUFFER_SIZE,
128 )
129 .serve_and_log_errors();
130 futures.push(cobalt_fut.boxed_local());
131
132 let data_proxy = match fuchsia_fs::directory::open_in_namespace(
133 "/data",
134 fio::PERM_READABLE | fio::PERM_WRITABLE,
135 ) {
136 Ok(proxy) => Some(proxy),
137 Err(e) => {
138 warn!("failed to open /data: {:#}", anyhow!(e));
139 None
140 }
141 };
142
143 if data_proxy.is_some() {
144 let namespace = Namespace::installed().context("failed to get installed namespace")?;
145 namespace.unbind("/data").context("failed to unbind /data from default namespace")?;
146 }
147
148 let config_proxy =
149 match fuchsia_fs::directory::open_in_namespace("/config/data", fio::PERM_READABLE) {
150 Ok(proxy) => Some(proxy),
151 Err(e) => {
152 warn!("failed to open /config/data: {:#}", anyhow!(e));
153 None
154 }
155 };
156
157 let repo_manager = Arc::new(AsyncRwLock::new(
158 load_repo_manager(
159 inspector.root().create_child("repository_manager"),
160 &config,
161 cobalt_sender.clone(),
162 std::time::Duration::from_secs(structured_config.tuf_metadata_timeout_seconds.into()),
163 data_proxy.clone(),
164 )
165 .await,
166 ));
167 let rewrite_manager = Arc::new(AsyncRwLock::new(
168 load_rewrite_manager(
169 inspector.root().create_child("rewrite_manager"),
170 &config,
171 data_proxy.clone(),
172 config_proxy,
173 )
174 .await,
175 ));
176
177 let delivery_blob_type: DeliveryBlobType =
178 structured_config.delivery_blob_type.try_into().with_context(|| {
179 format!("invalid delivery blob type {}", structured_config.delivery_blob_type)
180 })?;
181
182 let (blob_fetch_queue, blob_fetcher) = crate::cache::BlobFetcher::new(
183 fuchsia_component::client::connect_to_protocol::<fpkg_http::ClientMarker>()
184 .context("error connecting to fuchsia.pkg.http/Client")?,
185 inspector.root().create_child("blob_fetcher"),
186 structured_config.blob_download_concurrency_limit.into(),
187 repo_manager.read().await.stats(),
188 cache::BlobFetchParams::builder()
189 .header_network_timeout(zx::BootDuration::from_seconds(
190 structured_config.blob_network_header_timeout_seconds.into(),
191 ))
192 .body_network_timeout(zx::BootDuration::from_seconds(
193 structured_config.blob_network_body_timeout_seconds.into(),
194 ))
195 .download_resumption_attempts_limit(
196 structured_config.blob_download_resumption_attempts_limit,
197 )
198 .blob_type(delivery_blob_type)
199 .build(),
200 );
201 futures.push(blob_fetch_queue.boxed_local());
202
203 let resolver_service_inspect_state = Arc::new(ResolverServiceInspectState::from_node(
204 inspector.root().create_child("resolver_service"),
205 ));
206 let (package_fetch_queue, package_resolver) = resolver_service::QueuedResolver::new(
207 pkg_cache.clone(),
208 Arc::clone(&base_package_index),
209 Arc::clone(&system_cache_list),
210 Arc::clone(&repo_manager),
211 Arc::clone(&rewrite_manager),
212 blob_fetcher.clone(),
213 MAX_CONCURRENT_PACKAGE_FETCHES,
214 Arc::clone(&resolver_service_inspect_state),
215 );
216 futures.push(package_fetch_queue.boxed_local());
217
218 let eager_package_manager = Arc::new(
223 crate::eager_package_manager::EagerPackageManager::from_namespace(
224 package_resolver.clone(),
225 pkg_cache.clone(),
226 data_proxy,
227 &system_cache_list,
228 cobalt_sender.clone(),
229 )
230 .await
231 .map_err(|e| {
232 error!("failed to create EagerPackageManager: {:#}", &e);
233 })
234 .ok()
235 .map(AsyncRwLock::new),
236 );
237
238 let make_resolver_cb = {
239 let repo_manager = Arc::clone(&repo_manager);
240 let rewrite_manager = Arc::clone(&rewrite_manager);
241 let package_resolver = package_resolver.clone();
242 let pkg_cache = pkg_cache.clone();
243 let cobalt_sender = cobalt_sender.clone();
244 let eager_package_manager = Arc::clone(&eager_package_manager);
245 move |gc_protection| {
246 let repo_manager = Arc::clone(&repo_manager);
247 let rewrite_manager = Arc::clone(&rewrite_manager);
248 let package_resolver = package_resolver.clone();
249 let pkg_cache = pkg_cache.clone();
250 let base_package_index = Arc::clone(&base_package_index);
251 let system_cache_list = Arc::clone(&system_cache_list);
252 let cobalt_sender = cobalt_sender.clone();
253 let resolver_service_inspect_state = Arc::clone(&resolver_service_inspect_state);
254 let eager_package_manager = Arc::clone(&eager_package_manager);
255 move |stream| {
256 fasync::Task::local(
257 resolver_service::run_resolver_service(
258 Arc::clone(&repo_manager),
259 Arc::clone(&rewrite_manager),
260 package_resolver.clone(),
261 pkg_cache.clone(),
262 Arc::clone(&base_package_index),
263 Arc::clone(&system_cache_list),
264 stream,
265 gc_protection,
266 cobalt_sender.clone(),
267 Arc::clone(&resolver_service_inspect_state),
268 Arc::clone(&eager_package_manager),
269 )
270 .unwrap_or_else(|e| error!("run_resolver_service failed: {:#}", anyhow!(e))),
271 )
272 .detach()
273 }
274 }
275 };
276
277 let resolver_toolbox_cb = {
278 let package_resolver = package_resolver.clone();
279 let cobalt_sender = cobalt_sender.clone();
280 let eager_package_manager = Arc::clone(&eager_package_manager);
281 move |stream| {
282 fasync::Task::local(
283 resolver_service::run_resolver_toolbox_service(
284 package_resolver.clone(),
285 stream,
286 fpkg::GcProtection::OpenPackageTracking,
287 cobalt_sender.clone(),
288 Arc::clone(&eager_package_manager),
289 )
290 .unwrap_or_else(|e| {
291 error!("run_resolver_toolbox_service failed: {:#}", anyhow!(e))
292 }),
293 )
294 .detach()
295 }
296 };
297
298 let repo_cb = move |stream| {
299 let repo_manager = Arc::clone(&repo_manager);
300
301 fasync::Task::local(
302 async move {
303 let mut repo_service = RepositoryService::new(repo_manager);
304 repo_service.run(stream).await
305 }
306 .unwrap_or_else(|e| error!("error encountered: {:#}", anyhow!(e))),
307 )
308 .detach()
309 };
310
311 let rewrite_cb = move |stream| {
312 let mut rewrite_service = RewriteService::new(Arc::clone(&rewrite_manager));
313
314 fasync::Task::local(
315 async move { rewrite_service.handle_client(stream).await }
316 .unwrap_or_else(|e| error!("while handling rewrite client {:#}", anyhow!(e))),
317 )
318 .detach()
319 };
320
321 let cup_cb = {
322 let cobalt_sender = cobalt_sender.clone();
323 move |stream| {
324 fasync::Task::local(
325 eager_package_manager::run_cup_service(
326 Arc::clone(&eager_package_manager),
327 stream,
328 cobalt_sender.clone(),
329 )
330 .unwrap_or_else(|e| error!("run_cup_service failed: {:#}", anyhow!(e))),
331 )
332 .detach()
333 }
334 };
335
336 let component_resolver_cb = move |stream| {
337 fasync::Task::local(
338 component_resolver::serve(stream)
339 .unwrap_or_else(|e| error!("serve_component_resolver_failed: {:#}", e)),
340 )
341 .detach()
342 };
343
344 let ota_downloader_cb = move |stream| {
345 fasync::Task::local(
346 ota_downloader::serve(stream, blob_fetcher.clone(), pkg_cache.clone())
347 .unwrap_or_else(|e| error!("run_cup_service failed: {:#}", anyhow!(e))),
348 )
349 .detach()
350 };
351
352 let mut fs = ServiceFs::new();
353 fs.dir("svc")
354 .add_fidl_service(make_resolver_cb(fpkg::GcProtection::OpenPackageTracking))
355 .add_fidl_service_at(
356 format!("{}-ota", fpkg::PackageResolverMarker::PROTOCOL_NAME),
357 make_resolver_cb(fpkg::GcProtection::Retained),
358 )
359 .add_fidl_service(resolver_toolbox_cb)
360 .add_fidl_service(repo_cb)
361 .add_fidl_service(rewrite_cb)
362 .add_fidl_service(cup_cb)
363 .add_fidl_service(component_resolver_cb)
364 .add_fidl_service(ota_downloader_cb);
365
366 fs.take_and_serve_directory_handle().context("while serving directory handle")?;
367
368 futures.push(fs.collect().boxed_local());
369
370 cobalt_sender.send(
371 fmetrics::MetricEvent::builder(metrics::PKG_RESOLVER_STARTUP_DURATION_MIGRATED_METRIC_ID)
372 .as_integer(Instant::now().duration_since(startup_time).as_micros() as i64),
373 );
374
375 ftrace::instant!("app", "startup", ftrace::Scope::Process);
376
377 let _inspect_server_task =
378 inspect_runtime::publish(&inspector, inspect_runtime::PublishOptions::default());
379
380 futures.collect::<()>().await;
381
382 Ok(())
383}
384
385async fn load_repo_manager(
386 node: inspect::Node,
387 config: &Config,
388 mut cobalt_sender: ProtocolSender<fmetrics::MetricEvent>,
389 tuf_metadata_timeout: Duration,
390 data_proxy: Option<fio::DirectoryProxy>,
391) -> RepositoryManager {
392 let dynamic_repo_path =
395 if config.enable_dynamic_configuration() { Some(DYNAMIC_REPO_PATH) } else { None };
396 let builder = match RepositoryManagerBuilder::new(
397 data_proxy,
398 dynamic_repo_path,
399 tuf_metadata_timeout,
400 )
401 .await
402 .unwrap_or_else(|(builder, err)| {
403 error!("error loading dynamic repo config: {:#}", anyhow!(err));
404 builder
405 })
406 .inspect_node(node)
407 .load_static_configs_dir(STATIC_REPO_DIR)
408 {
409 Ok(builder) => {
410 cobalt_sender.send(
411 fmetrics::MetricEvent::builder(
412 metrics::REPOSITORY_MANAGER_LOAD_STATIC_CONFIGS_MIGRATED_METRIC_ID,
413 )
414 .with_event_codes(
415 metrics::RepositoryManagerLoadStaticConfigsMigratedMetricDimensionResult::Success,
416 )
417 .as_occurrence(1),
418 );
419 builder
420 }
421 Err((builder, errs)) => {
422 for err in errs {
423 let dimension_result: metrics::RepositoryManagerLoadStaticConfigsMigratedMetricDimensionResult
424 = (&err).into();
425 cobalt_sender.send(
426 fmetrics::MetricEvent::builder(
427 metrics::REPOSITORY_MANAGER_LOAD_STATIC_CONFIGS_MIGRATED_METRIC_ID,
428 )
429 .with_event_codes(dimension_result)
430 .as_occurrence(1),
431 );
432 match &err {
433 crate::repository_manager::LoadError::Io { path: _, error }
434 if error.kind() == std::io::ErrorKind::NotFound =>
435 {
436 info!("no statically configured repositories present");
437 }
438 _ => error!("error loading static repo config: {:#}", anyhow!(err)),
439 };
440 }
441 builder
442 }
443 };
444
445 match config.persisted_repos_dir() {
446 Some(repo) => builder.with_persisted_repos_dir(repo),
447 None => builder,
448 }
449 .cobalt_sender(cobalt_sender)
450 .build()
451}
452
453async fn load_rewrite_manager(
454 node: inspect::Node,
455 config: &Config,
456 data_proxy: Option<fio::DirectoryProxy>,
457 config_proxy: Option<fio::DirectoryProxy>,
458) -> RewriteManager {
459 let dynamic_rules_path =
460 if config.enable_dynamic_configuration() { Some(DYNAMIC_RULES_PATH) } else { None };
461 let builder = RewriteManagerBuilder::new(data_proxy, dynamic_rules_path)
462 .await
463 .unwrap_or_else(|(builder, err)| {
464 match err {
465 LoadRulesError::FileOpen(fuchsia_fs::node::OpenError::OpenError(
467 zx::Status::NOT_FOUND,
468 )) => {}
469 LoadRulesError::DirOpen(_) => {}
471 err => error!(
472 "unable to load dynamic rewrite rules from disk, using defaults: {:#}",
473 anyhow!(err)
474 ),
475 };
476 builder
477 })
478 .inspect_node(node)
479 .static_rules_path(config_proxy, STATIC_RULES_PATH)
480 .await
481 .unwrap_or_else(|(builder, err)| {
482 match err {
483 LoadRulesError::FileOpen(fuchsia_fs::node::OpenError::OpenError(
485 zx::Status::NOT_FOUND,
486 )) => {}
487 LoadRulesError::DirOpen(_) => {}
489 err => {
490 error!("unable to load static rewrite rules from disk: {:#}", anyhow!(err))
491 }
492 };
493 builder
494 });
495
496 builder.build()
497}