Skip to main content

pkg_resolver/
main.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![allow(clippy::let_unit_value)]
6#![allow(clippy::too_many_arguments)]
7#![allow(clippy::enum_variant_names)]
8
9use anyhow::{Context as _, Error, anyhow};
10use async_lock::RwLock as AsyncRwLock;
11use cobalt_sw_delivery_registry as metrics;
12use delivery_blob::DeliveryBlobType;
13use fdio::Namespace;
14use fidl::endpoints::DiscoverableProtocolMarker as _;
15use fidl_contrib::ProtocolConnector;
16use fidl_contrib::protocol_connector::ProtocolSender;
17use fidl_fuchsia_io as fio;
18use fidl_fuchsia_metrics as fmetrics;
19use fidl_fuchsia_pkg as fpkg;
20use fidl_fuchsia_pkg_http as fpkg_http;
21use fuchsia_async as fasync;
22use fuchsia_cobalt_builders::MetricEventExt as _;
23use fuchsia_component::server::ServiceFs;
24use fuchsia_inspect as inspect;
25use fuchsia_trace as ftrace;
26use futures::prelude::*;
27use futures::stream::FuturesUnordered;
28use log::{error, info, warn};
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31
32mod cache;
33mod cache_package_index;
34mod clock;
35mod component_resolver;
36mod config;
37mod eager_package_manager;
38mod error;
39mod inspect_util;
40mod metrics_util;
41mod ota_downloader;
42mod repository;
43mod repository_manager;
44mod repository_service;
45mod resolver_service;
46mod rewrite_manager;
47mod rewrite_service;
48mod util;
49
50#[cfg(test)]
51mod test_util;
52
53use crate::cache::BasePackageIndex;
54use crate::config::Config;
55use crate::repository_manager::{RepositoryManager, RepositoryManagerBuilder};
56use crate::repository_service::RepositoryService;
57use crate::resolver_service::ResolverServiceInspectState;
58use crate::rewrite_manager::{LoadRulesError, RewriteManager, RewriteManagerBuilder};
59use crate::rewrite_service::RewriteService;
60
61// FIXME: allow for multiple threads and sendable futures once repo updates support it.
62// FIXME(43342): trace durations assume they start and end on the same thread, but since the
63// package resolver's executor is multi-threaded, a trace duration that includes an 'await' may not
64// end on the same thread it starts on, resulting in invalid trace events.
65// const SERVER_THREADS: usize = 2;
66
67const MAX_CONCURRENT_PACKAGE_FETCHES: usize = 5;
68
69// Each fetch_blob call emits an event, and a system update fetches about 1,000 blobs in about a
70// minute.
71const COBALT_CONNECTOR_BUFFER_SIZE: usize = 1000;
72
73const STATIC_REPO_DIR: &str = "/config/data/repositories";
74// Relative to /data.
75const DYNAMIC_REPO_PATH: &str = "repositories.json";
76
77// Relative to /config/data.
78const STATIC_RULES_PATH: &str = "rewrites.json";
79// Relative to /data.
80const DYNAMIC_RULES_PATH: &str = "rewrites.json";
81
82#[fuchsia::main(logging_tags = ["pkg-resolver"])]
83pub fn main() -> Result<(), Error> {
84    let startup_time = Instant::now();
85    fuchsia_trace_provider::trace_provider_create_with_fdio();
86    info!("starting package resolver");
87
88    let mut executor = fasync::LocalExecutorBuilder::new().build();
89    executor.run_singlethreaded(main_inner_async(startup_time)).map_err(|err| {
90        // Use anyhow to print the error chain.
91        let err = anyhow!(err);
92        error!("error running pkg-resolver: {:#}", err);
93        err
94    })
95}
96
97async fn main_inner_async(startup_time: Instant) -> Result<(), Error> {
98    let config = Config::load_from_config_data_or_default();
99    let structured_config = pkg_resolver_config::Config::take_from_startup_handle();
100
101    let pkg_cache_proxy =
102        fuchsia_component::client::connect_to_protocol::<fpkg::PackageCacheMarker>()
103            .context("error connecting to package cache")?;
104    let pkg_cache = fidl_fuchsia_pkg_ext::cache::Client::from_proxy(pkg_cache_proxy);
105
106    let base_package_index = Arc::new(
107        BasePackageIndex::from_proxy(pkg_cache.proxy())
108            .await
109            .context("failed to load base package index")?,
110    );
111
112    // The list of cache packages from the system image, not to be confused with the PackageCache.
113    let system_cache_list = Arc::new(cache_package_index::from_proxy(pkg_cache.proxy()).await);
114
115    let inspector = fuchsia_inspect::Inspector::default();
116    inspector
117        .root()
118        .record_child("structured_config", |node| structured_config.record_inspect(node));
119
120    let futures = FuturesUnordered::new();
121
122    let (mut cobalt_sender, cobalt_fut) = ProtocolConnector::new_with_buffer_size(
123        metrics_util::CobaltConnectedService,
124        COBALT_CONNECTOR_BUFFER_SIZE,
125    )
126    .serve_and_log_errors();
127    futures.push(cobalt_fut.boxed_local());
128
129    let data_proxy = match fuchsia_fs::directory::open_in_namespace(
130        "/data",
131        fio::PERM_READABLE | fio::PERM_WRITABLE,
132    ) {
133        Ok(proxy) => Some(proxy),
134        Err(e) => {
135            warn!("failed to open /data: {:#}", anyhow!(e));
136            None
137        }
138    };
139
140    if data_proxy.is_some() {
141        let namespace = Namespace::installed().context("failed to get installed namespace")?;
142        namespace.unbind("/data").context("failed to unbind /data from default namespace")?;
143    }
144
145    let config_proxy =
146        match fuchsia_fs::directory::open_in_namespace("/config/data", fio::PERM_READABLE) {
147            Ok(proxy) => Some(proxy),
148            Err(e) => {
149                warn!("failed to open /config/data: {:#}", anyhow!(e));
150                None
151            }
152        };
153
154    let delivery_blob_type: DeliveryBlobType =
155        structured_config.delivery_blob_type.try_into().with_context(|| {
156            format!("invalid delivery blob type {}", structured_config.delivery_blob_type)
157        })?;
158
159    let repo_manager = Arc::new(AsyncRwLock::new(
160        load_repo_manager(
161            inspector.root().create_child("repository_manager"),
162            &config,
163            cobalt_sender.clone(),
164            TufTimeouts {
165                metadata: std::time::Duration::from_secs(
166                    structured_config.tuf_metadata_timeout_seconds.into(),
167                ),
168                network_header: zx::BootDuration::from_seconds(
169                    structured_config.tuf_network_header_timeout_seconds.into(),
170                ),
171            },
172            data_proxy.clone(),
173            delivery_blob_type,
174        )
175        .await,
176    ));
177    let rewrite_manager = Arc::new(AsyncRwLock::new(
178        load_rewrite_manager(
179            inspector.root().create_child("rewrite_manager"),
180            &config,
181            data_proxy.clone(),
182            config_proxy,
183        )
184        .await,
185    ));
186
187    let (blob_fetch_queue, blob_fetcher) = crate::cache::BlobFetcher::new(
188        fuchsia_component::client::connect_to_protocol::<fpkg_http::ClientMarker>()
189            .context("error connecting to fuchsia.pkg.http/Client")?,
190        inspector.root().create_child("blob_fetcher"),
191        structured_config.blob_download_concurrency_limit.into(),
192        repo_manager.read().await.stats(),
193        cache::BlobFetchParams::builder()
194            .header_network_timeout(zx::BootDuration::from_seconds(
195                structured_config.blob_network_header_timeout_seconds.into(),
196            ))
197            .body_network_timeout(zx::BootDuration::from_seconds(
198                structured_config.blob_network_body_timeout_seconds.into(),
199            ))
200            .download_resumption_attempts_limit(
201                structured_config.blob_download_resumption_attempts_limit,
202            )
203            .build(),
204    );
205    futures.push(blob_fetch_queue.boxed_local());
206
207    let resolver_service_inspect_state = Arc::new(ResolverServiceInspectState::from_node(
208        inspector.root().create_child("resolver_service"),
209    ));
210    let (package_fetch_queue, package_resolver) = resolver_service::QueuedResolver::new(
211        pkg_cache.clone(),
212        Arc::clone(&base_package_index),
213        Arc::clone(&system_cache_list),
214        Arc::clone(&repo_manager),
215        Arc::clone(&rewrite_manager),
216        blob_fetcher.clone(),
217        MAX_CONCURRENT_PACKAGE_FETCHES,
218        Arc::clone(&resolver_service_inspect_state),
219    );
220    futures.push(package_fetch_queue.boxed_local());
221
222    // `pkg-resolver` is required for an OTA and EagerPackageManager isn't.
223    // Also `EagerPackageManager` depends on /data, which may or may not be available, especially in
224    // tests. Wrapping `EagerPackageManager` in Arc<Option<_>> allows it to be used if available
225    // during package resolve process.
226    let eager_package_manager = Arc::new(
227        crate::eager_package_manager::EagerPackageManager::from_namespace(
228            package_resolver.clone(),
229            pkg_cache.clone(),
230            data_proxy,
231            &system_cache_list,
232            cobalt_sender.clone(),
233        )
234        .await
235        .map_err(|e| {
236            error!("failed to create EagerPackageManager: {:#}", &e);
237        })
238        .ok()
239        .map(AsyncRwLock::new),
240    );
241
242    let make_resolver_cb = {
243        let repo_manager = Arc::clone(&repo_manager);
244        let rewrite_manager = Arc::clone(&rewrite_manager);
245        let package_resolver = package_resolver.clone();
246        let pkg_cache = pkg_cache.clone();
247        let cobalt_sender = cobalt_sender.clone();
248        let eager_package_manager = Arc::clone(&eager_package_manager);
249        move |gc_protection| {
250            let repo_manager = Arc::clone(&repo_manager);
251            let rewrite_manager = Arc::clone(&rewrite_manager);
252            let package_resolver = package_resolver.clone();
253            let pkg_cache = pkg_cache.clone();
254            let base_package_index = Arc::clone(&base_package_index);
255            let system_cache_list = Arc::clone(&system_cache_list);
256            let cobalt_sender = cobalt_sender.clone();
257            let resolver_service_inspect_state = Arc::clone(&resolver_service_inspect_state);
258            let eager_package_manager = Arc::clone(&eager_package_manager);
259            move |stream| {
260                fasync::Task::local(
261                    resolver_service::run_resolver_service(
262                        Arc::clone(&repo_manager),
263                        Arc::clone(&rewrite_manager),
264                        package_resolver.clone(),
265                        pkg_cache.clone(),
266                        Arc::clone(&base_package_index),
267                        Arc::clone(&system_cache_list),
268                        stream,
269                        gc_protection,
270                        cobalt_sender.clone(),
271                        Arc::clone(&resolver_service_inspect_state),
272                        Arc::clone(&eager_package_manager),
273                    )
274                    .unwrap_or_else(|e| error!("run_resolver_service failed: {:#}", anyhow!(e))),
275                )
276                .detach()
277            }
278        }
279    };
280
281    let resolver_toolbox_cb = {
282        let package_resolver = package_resolver.clone();
283        let cobalt_sender = cobalt_sender.clone();
284        let eager_package_manager = Arc::clone(&eager_package_manager);
285        move |stream| {
286            fasync::Task::local(
287                resolver_service::run_resolver_toolbox_service(
288                    package_resolver.clone(),
289                    stream,
290                    fpkg::GcProtection::OpenPackageTracking,
291                    cobalt_sender.clone(),
292                    Arc::clone(&eager_package_manager),
293                )
294                .unwrap_or_else(|e| {
295                    error!("run_resolver_toolbox_service failed: {:#}", anyhow!(e))
296                }),
297            )
298            .detach()
299        }
300    };
301
302    let repo_cb = move |stream| {
303        let repo_manager = Arc::clone(&repo_manager);
304
305        fasync::Task::local(
306            async move {
307                let mut repo_service = RepositoryService::new(repo_manager);
308                repo_service.run(stream).await
309            }
310            .unwrap_or_else(|e| error!("error encountered: {:#}", anyhow!(e))),
311        )
312        .detach()
313    };
314
315    let rewrite_cb = move |stream| {
316        let mut rewrite_service = RewriteService::new(Arc::clone(&rewrite_manager));
317
318        fasync::Task::local(
319            async move { rewrite_service.handle_client(stream).await }
320                .unwrap_or_else(|e| error!("while handling rewrite client {:#}", anyhow!(e))),
321        )
322        .detach()
323    };
324
325    let cup_cb = {
326        let cobalt_sender = cobalt_sender.clone();
327        move |stream| {
328            fasync::Task::local(
329                eager_package_manager::run_cup_service(
330                    Arc::clone(&eager_package_manager),
331                    stream,
332                    cobalt_sender.clone(),
333                )
334                .unwrap_or_else(|e| error!("run_cup_service failed: {:#}", anyhow!(e))),
335            )
336            .detach()
337        }
338    };
339
340    let component_resolver_cb = move |stream| {
341        fasync::Task::local(
342            component_resolver::serve(stream)
343                .unwrap_or_else(|e| error!("serve_component_resolver_failed: {:#}", e)),
344        )
345        .detach()
346    };
347
348    let ota_downloader_cb = move |stream| {
349        fasync::Task::local(
350            ota_downloader::serve(stream, blob_fetcher.clone(), pkg_cache.clone())
351                .unwrap_or_else(|e| error!("run_cup_service failed: {:#}", anyhow!(e))),
352        )
353        .detach()
354    };
355
356    let mut fs = ServiceFs::new();
357    fs.dir("svc")
358        .add_fidl_service(make_resolver_cb(fpkg::GcProtection::OpenPackageTracking))
359        .add_fidl_service_at(
360            format!("{}-ota", fpkg::PackageResolverMarker::PROTOCOL_NAME),
361            make_resolver_cb(fpkg::GcProtection::Retained),
362        )
363        .add_fidl_service(resolver_toolbox_cb)
364        .add_fidl_service(repo_cb)
365        .add_fidl_service(rewrite_cb)
366        .add_fidl_service(cup_cb)
367        .add_fidl_service(component_resolver_cb)
368        .add_fidl_service(ota_downloader_cb);
369
370    fs.take_and_serve_directory_handle().context("while serving directory handle")?;
371
372    futures.push(fs.collect().boxed_local());
373
374    cobalt_sender.send(
375        fmetrics::MetricEvent::builder(metrics::PKG_RESOLVER_STARTUP_DURATION_MIGRATED_METRIC_ID)
376            .as_integer(Instant::now().duration_since(startup_time).as_micros() as i64),
377    );
378
379    ftrace::instant!("app", "startup", ftrace::Scope::Process);
380
381    let _inspect_server_task =
382        inspect_runtime::publish(&inspector, inspect_runtime::PublishOptions::default());
383
384    futures.collect::<()>().await;
385
386    Ok(())
387}
388
389async fn load_repo_manager(
390    node: inspect::Node,
391    config: &Config,
392    mut cobalt_sender: ProtocolSender<fmetrics::MetricEvent>,
393    tuf_timeouts: TufTimeouts,
394    data_proxy: Option<fio::DirectoryProxy>,
395    delivery_blob_type: DeliveryBlobType,
396) -> RepositoryManager {
397    // report any errors we saw, but don't error out because otherwise we won't be able
398    // to update the system.
399    let dynamic_repo_path =
400        if config.enable_dynamic_configuration() { Some(DYNAMIC_REPO_PATH) } else { None };
401    let builder = match RepositoryManagerBuilder::new(data_proxy, dynamic_repo_path, tuf_timeouts)
402        .await
403        .unwrap_or_else(|(builder, err)| {
404            error!("error loading dynamic repo config: {:#}", anyhow!(err));
405            builder
406        })
407        .delivery_blob_type(delivery_blob_type)
408        .inspect_node(node)
409        .load_static_configs_dir(STATIC_REPO_DIR)
410    {
411        Ok(builder) => {
412            cobalt_sender.send(
413                fmetrics::MetricEvent::builder(
414                    metrics::REPOSITORY_MANAGER_LOAD_STATIC_CONFIGS_MIGRATED_METRIC_ID,
415                )
416                .with_event_codes(
417                    metrics::RepositoryManagerLoadStaticConfigsMigratedMetricDimensionResult::Success,
418                )
419                .as_occurrence(1),
420            );
421            builder
422        }
423        Err((builder, errs)) => {
424            for err in errs {
425                let dimension_result: metrics::RepositoryManagerLoadStaticConfigsMigratedMetricDimensionResult
426                    = (&err).into();
427                cobalt_sender.send(
428                    fmetrics::MetricEvent::builder(
429                        metrics::REPOSITORY_MANAGER_LOAD_STATIC_CONFIGS_MIGRATED_METRIC_ID,
430                    )
431                    .with_event_codes(dimension_result)
432                    .as_occurrence(1),
433                );
434                match &err {
435                    crate::repository_manager::LoadError::Io { path: _, error }
436                        if error.kind() == std::io::ErrorKind::NotFound =>
437                    {
438                        info!("no statically configured repositories present");
439                    }
440                    _ => error!("error loading static repo config: {:#}", anyhow!(err)),
441                };
442            }
443            builder
444        }
445    };
446
447    match config.persisted_repos_dir() {
448        Some(repo) => builder.with_persisted_repos_dir(repo),
449        None => builder,
450    }
451    .cobalt_sender(cobalt_sender)
452    .build()
453}
454
455async fn load_rewrite_manager(
456    node: inspect::Node,
457    config: &Config,
458    data_proxy: Option<fio::DirectoryProxy>,
459    config_proxy: Option<fio::DirectoryProxy>,
460) -> RewriteManager {
461    let dynamic_rules_path =
462        if config.enable_dynamic_configuration() { Some(DYNAMIC_RULES_PATH) } else { None };
463    let builder = RewriteManagerBuilder::new(data_proxy, dynamic_rules_path)
464        .await
465        .unwrap_or_else(|(builder, err)| {
466            match err {
467                // Given a fresh /data, it's expected the file doesn't exist.
468                LoadRulesError::FileOpen(fuchsia_fs::node::OpenError::OpenError(
469                    zx::Status::NOT_FOUND,
470                )) => {}
471                // Unable to open /data dir proxy.
472                LoadRulesError::DirOpen(_) => {}
473                err => error!(
474                    "unable to load dynamic rewrite rules from disk, using defaults: {:#}",
475                    anyhow!(err)
476                ),
477            };
478            builder
479        })
480        .inspect_node(node)
481        .static_rules_path(config_proxy, STATIC_RULES_PATH)
482        .await
483        .unwrap_or_else(|(builder, err)| {
484            match err {
485                // No static rules are configured for this system version.
486                LoadRulesError::FileOpen(fuchsia_fs::node::OpenError::OpenError(
487                    zx::Status::NOT_FOUND,
488                )) => {}
489                // Unable to open /config/data dir proxy.
490                LoadRulesError::DirOpen(_) => {}
491                err => {
492                    error!("unable to load static rewrite rules from disk: {:#}", anyhow!(err))
493                }
494            };
495            builder
496        });
497
498    builder.build()
499}
500
501#[derive(Clone, Copy, Debug)]
502struct TufTimeouts {
503    metadata: Duration,
504    network_header: zx::BootDuration,
505}