pkg_resolver/
main.rs

1// Copyright 2018 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#![allow(clippy::let_unit_value)]
6#![allow(clippy::too_many_arguments)]
7#![allow(clippy::enum_variant_names)]
8
9use anyhow::{Context as _, Error, anyhow};
10use async_lock::RwLock as AsyncRwLock;
11use delivery_blob::DeliveryBlobType;
12use fdio::Namespace;
13use fidl::endpoints::DiscoverableProtocolMarker as _;
14use fidl_contrib::ProtocolConnector;
15use fidl_contrib::protocol_connector::ProtocolSender;
16use fuchsia_cobalt_builders::MetricEventExt as _;
17use fuchsia_component::server::ServiceFs;
18use futures::prelude::*;
19use futures::stream::FuturesUnordered;
20use log::{error, info, warn};
21use std::sync::Arc;
22use std::time::{Duration, Instant};
23use {
24    cobalt_sw_delivery_registry as metrics, fidl_fuchsia_io as fio,
25    fidl_fuchsia_metrics as fmetrics, fidl_fuchsia_pkg as fpkg, fidl_fuchsia_pkg_http as fpkg_http,
26    fuchsia_async as fasync, fuchsia_inspect as inspect, fuchsia_trace as ftrace,
27};
28
29mod cache;
30mod cache_package_index;
31mod clock;
32mod component_resolver;
33mod config;
34mod eager_package_manager;
35mod error;
36mod inspect_util;
37mod metrics_util;
38mod ota_downloader;
39mod repository;
40mod repository_manager;
41mod repository_service;
42mod resolver_service;
43mod rewrite_manager;
44mod rewrite_service;
45mod util;
46
47#[cfg(test)]
48mod test_util;
49
50use crate::cache::BasePackageIndex;
51use crate::config::Config;
52use crate::repository_manager::{RepositoryManager, RepositoryManagerBuilder};
53use crate::repository_service::RepositoryService;
54use crate::resolver_service::ResolverServiceInspectState;
55use crate::rewrite_manager::{LoadRulesError, RewriteManager, RewriteManagerBuilder};
56use crate::rewrite_service::RewriteService;
57
58// FIXME: allow for multiple threads and sendable futures once repo updates support it.
59// FIXME(43342): trace durations assume they start and end on the same thread, but since the
60// package resolver's executor is multi-threaded, a trace duration that includes an 'await' may not
61// end on the same thread it starts on, resulting in invalid trace events.
62// const SERVER_THREADS: usize = 2;
63
64const MAX_CONCURRENT_PACKAGE_FETCHES: usize = 5;
65
66// Each fetch_blob call emits an event, and a system update fetches about 1,000 blobs in about a
67// minute.
68const COBALT_CONNECTOR_BUFFER_SIZE: usize = 1000;
69
70const STATIC_REPO_DIR: &str = "/config/data/repositories";
71// Relative to /data.
72const DYNAMIC_REPO_PATH: &str = "repositories.json";
73
74// Relative to /config/data.
75const STATIC_RULES_PATH: &str = "rewrites.json";
76// Relative to /data.
77const DYNAMIC_RULES_PATH: &str = "rewrites.json";
78
79// The TCP keepalive timeout here in effect acts as a sort of between bytes timeout for connections
80// that are no longer established. Explicit timeouts are used around request futures to guard
81// against cases where both sides agree the connection is established, but the client expects more
82// data and the server doesn't intend to send any.
83const TCP_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(30);
84
85#[fuchsia::main(logging_tags = ["pkg-resolver"])]
86pub fn main() -> Result<(), Error> {
87    let startup_time = Instant::now();
88    fuchsia_trace_provider::trace_provider_create_with_fdio();
89    info!("starting package resolver");
90
91    let mut executor = fasync::LocalExecutorBuilder::new().build();
92    executor.run_singlethreaded(main_inner_async(startup_time)).map_err(|err| {
93        // Use anyhow to print the error chain.
94        let err = anyhow!(err);
95        error!("error running pkg-resolver: {:#}", err);
96        err
97    })
98}
99
100async fn main_inner_async(startup_time: Instant) -> Result<(), Error> {
101    let config = Config::load_from_config_data_or_default();
102    let structured_config = pkg_resolver_config::Config::take_from_startup_handle();
103
104    let pkg_cache_proxy =
105        fuchsia_component::client::connect_to_protocol::<fpkg::PackageCacheMarker>()
106            .context("error connecting to package cache")?;
107    let pkg_cache = fidl_fuchsia_pkg_ext::cache::Client::from_proxy(pkg_cache_proxy);
108
109    let base_package_index = Arc::new(
110        BasePackageIndex::from_proxy(pkg_cache.proxy())
111            .await
112            .context("failed to load base package index")?,
113    );
114
115    // The list of cache packages from the system image, not to be confused with the PackageCache.
116    let system_cache_list = Arc::new(cache_package_index::from_proxy(pkg_cache.proxy()).await);
117
118    let inspector = fuchsia_inspect::Inspector::default();
119    inspector
120        .root()
121        .record_child("structured_config", |node| structured_config.record_inspect(node));
122
123    let futures = FuturesUnordered::new();
124
125    let (mut cobalt_sender, cobalt_fut) = ProtocolConnector::new_with_buffer_size(
126        metrics_util::CobaltConnectedService,
127        COBALT_CONNECTOR_BUFFER_SIZE,
128    )
129    .serve_and_log_errors();
130    futures.push(cobalt_fut.boxed_local());
131
132    let data_proxy = match fuchsia_fs::directory::open_in_namespace(
133        "/data",
134        fio::PERM_READABLE | fio::PERM_WRITABLE,
135    ) {
136        Ok(proxy) => Some(proxy),
137        Err(e) => {
138            warn!("failed to open /data: {:#}", anyhow!(e));
139            None
140        }
141    };
142
143    if data_proxy.is_some() {
144        let namespace = Namespace::installed().context("failed to get installed namespace")?;
145        namespace.unbind("/data").context("failed to unbind /data from default namespace")?;
146    }
147
148    let config_proxy =
149        match fuchsia_fs::directory::open_in_namespace("/config/data", fio::PERM_READABLE) {
150            Ok(proxy) => Some(proxy),
151            Err(e) => {
152                warn!("failed to open /config/data: {:#}", anyhow!(e));
153                None
154            }
155        };
156
157    let repo_manager = Arc::new(AsyncRwLock::new(
158        load_repo_manager(
159            inspector.root().create_child("repository_manager"),
160            &config,
161            cobalt_sender.clone(),
162            std::time::Duration::from_secs(structured_config.tuf_metadata_timeout_seconds.into()),
163            data_proxy.clone(),
164        )
165        .await,
166    ));
167    let rewrite_manager = Arc::new(AsyncRwLock::new(
168        load_rewrite_manager(
169            inspector.root().create_child("rewrite_manager"),
170            &config,
171            data_proxy.clone(),
172            config_proxy,
173        )
174        .await,
175    ));
176
177    let delivery_blob_type: DeliveryBlobType =
178        structured_config.delivery_blob_type.try_into().with_context(|| {
179            format!("invalid delivery blob type {}", structured_config.delivery_blob_type)
180        })?;
181
182    let (blob_fetch_queue, blob_fetcher) = crate::cache::BlobFetcher::new(
183        fuchsia_component::client::connect_to_protocol::<fpkg_http::ClientMarker>()
184            .context("error connecting to fuchsia.pkg.http/Client")?,
185        inspector.root().create_child("blob_fetcher"),
186        structured_config.blob_download_concurrency_limit.into(),
187        repo_manager.read().await.stats(),
188        cache::BlobFetchParams::builder()
189            .header_network_timeout(zx::BootDuration::from_seconds(
190                structured_config.blob_network_header_timeout_seconds.into(),
191            ))
192            .body_network_timeout(zx::BootDuration::from_seconds(
193                structured_config.blob_network_body_timeout_seconds.into(),
194            ))
195            .download_resumption_attempts_limit(
196                structured_config.blob_download_resumption_attempts_limit,
197            )
198            .blob_type(delivery_blob_type)
199            .build(),
200    );
201    futures.push(blob_fetch_queue.boxed_local());
202
203    let resolver_service_inspect_state = Arc::new(ResolverServiceInspectState::from_node(
204        inspector.root().create_child("resolver_service"),
205    ));
206    let (package_fetch_queue, package_resolver) = resolver_service::QueuedResolver::new(
207        pkg_cache.clone(),
208        Arc::clone(&base_package_index),
209        Arc::clone(&system_cache_list),
210        Arc::clone(&repo_manager),
211        Arc::clone(&rewrite_manager),
212        blob_fetcher.clone(),
213        MAX_CONCURRENT_PACKAGE_FETCHES,
214        Arc::clone(&resolver_service_inspect_state),
215    );
216    futures.push(package_fetch_queue.boxed_local());
217
218    // `pkg-resolver` is required for an OTA and EagerPackageManager isn't.
219    // Also `EagerPackageManager` depends on /data, which may or may not be available, especially in
220    // tests. Wrapping `EagerPackageManager` in Arc<Option<_>> allows it to be used if available
221    // during package resolve process.
222    let eager_package_manager = Arc::new(
223        crate::eager_package_manager::EagerPackageManager::from_namespace(
224            package_resolver.clone(),
225            pkg_cache.clone(),
226            data_proxy,
227            &system_cache_list,
228            cobalt_sender.clone(),
229        )
230        .await
231        .map_err(|e| {
232            error!("failed to create EagerPackageManager: {:#}", &e);
233        })
234        .ok()
235        .map(AsyncRwLock::new),
236    );
237
238    let make_resolver_cb = {
239        let repo_manager = Arc::clone(&repo_manager);
240        let rewrite_manager = Arc::clone(&rewrite_manager);
241        let package_resolver = package_resolver.clone();
242        let pkg_cache = pkg_cache.clone();
243        let cobalt_sender = cobalt_sender.clone();
244        let eager_package_manager = Arc::clone(&eager_package_manager);
245        move |gc_protection| {
246            let repo_manager = Arc::clone(&repo_manager);
247            let rewrite_manager = Arc::clone(&rewrite_manager);
248            let package_resolver = package_resolver.clone();
249            let pkg_cache = pkg_cache.clone();
250            let base_package_index = Arc::clone(&base_package_index);
251            let system_cache_list = Arc::clone(&system_cache_list);
252            let cobalt_sender = cobalt_sender.clone();
253            let resolver_service_inspect_state = Arc::clone(&resolver_service_inspect_state);
254            let eager_package_manager = Arc::clone(&eager_package_manager);
255            move |stream| {
256                fasync::Task::local(
257                    resolver_service::run_resolver_service(
258                        Arc::clone(&repo_manager),
259                        Arc::clone(&rewrite_manager),
260                        package_resolver.clone(),
261                        pkg_cache.clone(),
262                        Arc::clone(&base_package_index),
263                        Arc::clone(&system_cache_list),
264                        stream,
265                        gc_protection,
266                        cobalt_sender.clone(),
267                        Arc::clone(&resolver_service_inspect_state),
268                        Arc::clone(&eager_package_manager),
269                    )
270                    .unwrap_or_else(|e| error!("run_resolver_service failed: {:#}", anyhow!(e))),
271                )
272                .detach()
273            }
274        }
275    };
276
277    let resolver_toolbox_cb = {
278        let package_resolver = package_resolver.clone();
279        let cobalt_sender = cobalt_sender.clone();
280        let eager_package_manager = Arc::clone(&eager_package_manager);
281        move |stream| {
282            fasync::Task::local(
283                resolver_service::run_resolver_toolbox_service(
284                    package_resolver.clone(),
285                    stream,
286                    fpkg::GcProtection::OpenPackageTracking,
287                    cobalt_sender.clone(),
288                    Arc::clone(&eager_package_manager),
289                )
290                .unwrap_or_else(|e| {
291                    error!("run_resolver_toolbox_service failed: {:#}", anyhow!(e))
292                }),
293            )
294            .detach()
295        }
296    };
297
298    let repo_cb = move |stream| {
299        let repo_manager = Arc::clone(&repo_manager);
300
301        fasync::Task::local(
302            async move {
303                let mut repo_service = RepositoryService::new(repo_manager);
304                repo_service.run(stream).await
305            }
306            .unwrap_or_else(|e| error!("error encountered: {:#}", anyhow!(e))),
307        )
308        .detach()
309    };
310
311    let rewrite_cb = move |stream| {
312        let mut rewrite_service = RewriteService::new(Arc::clone(&rewrite_manager));
313
314        fasync::Task::local(
315            async move { rewrite_service.handle_client(stream).await }
316                .unwrap_or_else(|e| error!("while handling rewrite client {:#}", anyhow!(e))),
317        )
318        .detach()
319    };
320
321    let cup_cb = {
322        let cobalt_sender = cobalt_sender.clone();
323        move |stream| {
324            fasync::Task::local(
325                eager_package_manager::run_cup_service(
326                    Arc::clone(&eager_package_manager),
327                    stream,
328                    cobalt_sender.clone(),
329                )
330                .unwrap_or_else(|e| error!("run_cup_service failed: {:#}", anyhow!(e))),
331            )
332            .detach()
333        }
334    };
335
336    let component_resolver_cb = move |stream| {
337        fasync::Task::local(
338            component_resolver::serve(stream)
339                .unwrap_or_else(|e| error!("serve_component_resolver_failed: {:#}", e)),
340        )
341        .detach()
342    };
343
344    let ota_downloader_cb = move |stream| {
345        fasync::Task::local(
346            ota_downloader::serve(stream, blob_fetcher.clone(), pkg_cache.clone())
347                .unwrap_or_else(|e| error!("run_cup_service failed: {:#}", anyhow!(e))),
348        )
349        .detach()
350    };
351
352    let mut fs = ServiceFs::new();
353    fs.dir("svc")
354        .add_fidl_service(make_resolver_cb(fpkg::GcProtection::OpenPackageTracking))
355        .add_fidl_service_at(
356            format!("{}-ota", fpkg::PackageResolverMarker::PROTOCOL_NAME),
357            make_resolver_cb(fpkg::GcProtection::Retained),
358        )
359        .add_fidl_service(resolver_toolbox_cb)
360        .add_fidl_service(repo_cb)
361        .add_fidl_service(rewrite_cb)
362        .add_fidl_service(cup_cb)
363        .add_fidl_service(component_resolver_cb)
364        .add_fidl_service(ota_downloader_cb);
365
366    fs.take_and_serve_directory_handle().context("while serving directory handle")?;
367
368    futures.push(fs.collect().boxed_local());
369
370    cobalt_sender.send(
371        fmetrics::MetricEvent::builder(metrics::PKG_RESOLVER_STARTUP_DURATION_MIGRATED_METRIC_ID)
372            .as_integer(Instant::now().duration_since(startup_time).as_micros() as i64),
373    );
374
375    ftrace::instant!("app", "startup", ftrace::Scope::Process);
376
377    let _inspect_server_task =
378        inspect_runtime::publish(&inspector, inspect_runtime::PublishOptions::default());
379
380    futures.collect::<()>().await;
381
382    Ok(())
383}
384
385async fn load_repo_manager(
386    node: inspect::Node,
387    config: &Config,
388    mut cobalt_sender: ProtocolSender<fmetrics::MetricEvent>,
389    tuf_metadata_timeout: Duration,
390    data_proxy: Option<fio::DirectoryProxy>,
391) -> RepositoryManager {
392    // report any errors we saw, but don't error out because otherwise we won't be able
393    // to update the system.
394    let dynamic_repo_path =
395        if config.enable_dynamic_configuration() { Some(DYNAMIC_REPO_PATH) } else { None };
396    let builder = match RepositoryManagerBuilder::new(
397        data_proxy,
398        dynamic_repo_path,
399        tuf_metadata_timeout,
400    )
401    .await
402    .unwrap_or_else(|(builder, err)| {
403        error!("error loading dynamic repo config: {:#}", anyhow!(err));
404        builder
405    })
406    .inspect_node(node)
407    .load_static_configs_dir(STATIC_REPO_DIR)
408    {
409        Ok(builder) => {
410            cobalt_sender.send(
411                fmetrics::MetricEvent::builder(
412                    metrics::REPOSITORY_MANAGER_LOAD_STATIC_CONFIGS_MIGRATED_METRIC_ID,
413                )
414                .with_event_codes(
415                    metrics::RepositoryManagerLoadStaticConfigsMigratedMetricDimensionResult::Success,
416                )
417                .as_occurrence(1),
418            );
419            builder
420        }
421        Err((builder, errs)) => {
422            for err in errs {
423                let dimension_result: metrics::RepositoryManagerLoadStaticConfigsMigratedMetricDimensionResult
424                    = (&err).into();
425                cobalt_sender.send(
426                    fmetrics::MetricEvent::builder(
427                        metrics::REPOSITORY_MANAGER_LOAD_STATIC_CONFIGS_MIGRATED_METRIC_ID,
428                    )
429                    .with_event_codes(dimension_result)
430                    .as_occurrence(1),
431                );
432                match &err {
433                    crate::repository_manager::LoadError::Io { path: _, error }
434                        if error.kind() == std::io::ErrorKind::NotFound =>
435                    {
436                        info!("no statically configured repositories present");
437                    }
438                    _ => error!("error loading static repo config: {:#}", anyhow!(err)),
439                };
440            }
441            builder
442        }
443    };
444
445    match config.persisted_repos_dir() {
446        Some(repo) => builder.with_persisted_repos_dir(repo),
447        None => builder,
448    }
449    .cobalt_sender(cobalt_sender)
450    .build()
451}
452
453async fn load_rewrite_manager(
454    node: inspect::Node,
455    config: &Config,
456    data_proxy: Option<fio::DirectoryProxy>,
457    config_proxy: Option<fio::DirectoryProxy>,
458) -> RewriteManager {
459    let dynamic_rules_path =
460        if config.enable_dynamic_configuration() { Some(DYNAMIC_RULES_PATH) } else { None };
461    let builder = RewriteManagerBuilder::new(data_proxy, dynamic_rules_path)
462        .await
463        .unwrap_or_else(|(builder, err)| {
464            match err {
465                // Given a fresh /data, it's expected the file doesn't exist.
466                LoadRulesError::FileOpen(fuchsia_fs::node::OpenError::OpenError(
467                    zx::Status::NOT_FOUND,
468                )) => {}
469                // Unable to open /data dir proxy.
470                LoadRulesError::DirOpen(_) => {}
471                err => error!(
472                    "unable to load dynamic rewrite rules from disk, using defaults: {:#}",
473                    anyhow!(err)
474                ),
475            };
476            builder
477        })
478        .inspect_node(node)
479        .static_rules_path(config_proxy, STATIC_RULES_PATH)
480        .await
481        .unwrap_or_else(|(builder, err)| {
482            match err {
483                // No static rules are configured for this system version.
484                LoadRulesError::FileOpen(fuchsia_fs::node::OpenError::OpenError(
485                    zx::Status::NOT_FOUND,
486                )) => {}
487                // Unable to open /config/data dir proxy.
488                LoadRulesError::DirOpen(_) => {}
489                err => {
490                    error!("unable to load static rewrite rules from disk: {:#}", anyhow!(err))
491                }
492            };
493            builder
494        });
495
496    builder.build()
497}