mod selector_list;
mod string_list;
use {
anyhow::{bail, Context as _, Error},
fuchsia_inspect as inspect,
futures::FutureExt,
glob::{GlobError, Paths},
serde::{de::DeserializeOwned, Deserialize},
std::fs,
std::path::{Path, PathBuf},
std::sync::{Arc, Mutex},
string_list::StringList,
};
pub use selector_list::{ParsedSelector, SelectorList};
const MONIKER_INTERPOLATION: &str = "{MONIKER}";
const METRICS_INSPECT_SIZE_BYTES: usize = 1024 * 1024; const DEFAULT_MIN_SAMPLE_RATE_SEC: i64 = 10;
#[derive(Deserialize, Debug, PartialEq)]
pub struct ProjectConfig {
pub project_id: u32,
customer_id: Option<u32>,
pub poll_rate_sec: i64,
pub metrics: Vec<MetricConfig>,
#[serde(skip, default = "default_source_name")]
source_name: String,
}
impl ProjectConfig {
pub fn customer_id(&self) -> u32 {
self.customer_id.unwrap_or(1)
}
}
#[derive(Deserialize, Debug, PartialEq)]
struct ProjectTemplate {
project_id: u32,
customer_id: Option<u32>,
poll_rate_sec: i64,
metrics: Vec<MetricTemplate>,
#[serde(skip, default = "default_source_name")]
source_name: String,
}
fn default_source_name() -> String {
"<unknown>".to_string()
}
#[derive(Clone, Deserialize, Debug, PartialEq)]
pub struct MetricConfig {
#[serde(rename = "selector")]
pub selectors: SelectorList,
pub metric_id: u32,
pub metric_type: DataType,
pub event_codes: Vec<u32>,
pub upload_once: Option<bool>,
pub project_id: Option<u32>,
}
#[derive(Clone, Deserialize, Debug, PartialEq)]
struct MetricTemplate {
#[serde(rename = "selector")]
selectors: StringList,
metric_id: u32,
metric_type: DataType,
event_codes: Option<Vec<u32>>,
upload_once: Option<bool>,
project_id: Option<u32>,
}
#[derive(Deserialize, Debug, PartialEq, Eq, Copy, Clone)]
pub enum DataType {
Occurrence,
Integer,
IntHistogram,
String,
}
pub fn parse_selector_for_test(selector_str: &str) -> Option<ParsedSelector> {
Some(selector_list::parse_selector::<serde::de::value::Error>(selector_str).unwrap())
}
#[derive(Deserialize, Debug)]
pub struct ComponentIdInfoList(Vec<ComponentIdInfo>);
#[derive(Deserialize, Debug)]
pub struct ComponentIdInfo {
moniker: String,
id: u32,
#[allow(unused)]
label: String,
}
impl std::ops::Deref for ComponentIdInfoList {
type Target = Vec<ComponentIdInfo>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl std::ops::DerefMut for ComponentIdInfoList {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl IntoIterator for ComponentIdInfoList {
type Item = ComponentIdInfo;
type IntoIter = std::vec::IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
trait RemembersSource {
fn remember_source(&mut self, _source: String);
}
impl RemembersSource for ProjectConfig {
fn remember_source(&mut self, source: String) {
self.source_name = source;
}
}
impl RemembersSource for ProjectTemplate {
fn remember_source(&mut self, source: String) {
self.source_name = source;
}
}
impl RemembersSource for ComponentIdInfoList {
fn remember_source(&mut self, _source: String) {}
}
fn paths_matching_name(path: impl AsRef<Path>, name: &str) -> Result<Paths, Error> {
let path = path.as_ref();
let pattern = path.join(name);
Ok(glob::glob(&pattern.to_string_lossy())?)
}
fn load_many<T: DeserializeOwned + RemembersSource>(paths: Paths) -> Result<Vec<T>, Error> {
paths
.map(|path: Result<PathBuf, GlobError>| {
let path = path?;
let json_string: String =
fs::read_to_string(&path).with_context(|| format!("parsing {}", path.display()))?;
let mut config: T = serde_json5::from_str(&json_string)?;
let file_name = path
.file_name()
.map(|name| name.to_string_lossy().to_string())
.unwrap_or_else(default_source_name);
config.remember_source(file_name);
Ok(config)
})
.collect::<Result<Vec<_>, _>>()
}
#[derive(Debug)]
pub struct SamplerConfig {
pub project_configs: Vec<Arc<ProjectConfig>>,
pub configure_reader_for_tests: bool,
pub minimum_sample_rate_sec: i64,
inspect_node: Mutex<Option<inspect::LazyNode>>,
}
pub struct SamplerConfigBuilder {
minimum_sample_rate_sec: i64,
configure_reader_for_tests: bool,
sampler_dir: Option<PathBuf>, fire_dir: Option<PathBuf>,
}
impl SamplerConfigBuilder {
pub fn default() -> Self {
SamplerConfigBuilder {
minimum_sample_rate_sec: DEFAULT_MIN_SAMPLE_RATE_SEC,
configure_reader_for_tests: false,
sampler_dir: None,
fire_dir: None,
}
}
pub fn minimum_sample_rate_sec(mut self, minimum_sample_rate_sec: i64) -> Self {
self.minimum_sample_rate_sec = minimum_sample_rate_sec;
self
}
pub fn configure_reader_for_tests(mut self, configure_reader_for_tests: bool) -> Self {
self.configure_reader_for_tests = configure_reader_for_tests;
self
}
pub fn sampler_dir(mut self, path: impl Into<PathBuf>) -> Self {
self.sampler_dir = Some(path.into());
self
}
pub fn fire_dir(mut self, path: impl Into<PathBuf>) -> Self {
self.fire_dir = Some(path.into());
self
}
pub fn load(self) -> Result<Arc<SamplerConfig>, Error> {
if let Some(sampler_dir) = self.sampler_dir.as_ref() {
SamplerConfig::from_directories_internal(
self.minimum_sample_rate_sec,
self.configure_reader_for_tests,
sampler_dir,
self.fire_dir,
)
.map(Arc::new)
} else {
bail!("sampler_dir must be configured before loading SamplerConfig")
}
}
}
impl MetricConfig {
fn from_template(template: MetricTemplate, component: &ComponentIdInfo) -> Result<Self, Error> {
let MetricTemplate {
mut selectors,
event_codes,
metric_id,
metric_type,
upload_once,
project_id,
} = template;
let selectors = SelectorList(
selectors
.iter_mut()
.map::<Result<_, anyhow::Error>, _>(|s| {
let filled_template = Self::insert_moniker(s, &component.moniker)?;
Ok(match selector_list::parse_selector::<serde::de::value::Error>(
&filled_template,
) {
Ok(selector) => Ok(Some(selector)),
Err(err) => Err(err),
}?)
})
.collect::<Result<Vec<Option<_>>, _>>()?,
);
let event_codes = match event_codes {
None => vec![component.id],
Some(mut codes) => {
codes.insert(0, component.id);
codes
}
};
Ok(MetricConfig { event_codes, selectors, metric_id, metric_type, upload_once, project_id })
}
fn insert_moniker(template: &str, moniker: &str) -> Result<String, Error> {
let interpolate_position = template.find(MONIKER_INTERPOLATION);
let separator_position = template.find(":");
match (interpolate_position, separator_position) {
(Some(i), Some(s)) if i < s => Ok(template.replace(MONIKER_INTERPOLATION, moniker)),
(Some(_), Some(_)) => Ok(template.replace(
MONIKER_INTERPOLATION,
&selectors::sanitize_string_for_selectors(moniker),
)),
(None, _) => {
bail!("{} not found in selector template {}", MONIKER_INTERPOLATION, template)
}
_ => bail!("Separator ':' not found in selector template {}", template),
}
}
}
impl ProjectConfig {
fn from_template(
template: ProjectTemplate,
components: &Vec<ComponentIdInfo>,
) -> Result<Self, Error> {
let ProjectTemplate { metrics, customer_id, project_id, poll_rate_sec, source_name } =
template;
let mut expanded_metrics = vec![];
for component in components.iter() {
for metric in &metrics {
expanded_metrics.push(MetricConfig::from_template(metric.to_owned(), &component)?);
}
}
Ok(ProjectConfig {
metrics: expanded_metrics,
customer_id,
project_id,
poll_rate_sec,
source_name,
})
}
}
fn expand_fire_projects(
projects: Vec<ProjectTemplate>,
components: Vec<ComponentIdInfo>,
) -> Result<Vec<ProjectConfig>, Error> {
projects
.into_iter()
.map(|project| ProjectConfig::from_template(project, &components))
.collect::<Result<Vec<_>, _>>()
}
impl SamplerConfig {
fn from_directories_internal(
minimum_sample_rate_sec: i64,
configure_reader_for_tests: bool,
sampler_dir: impl AsRef<Path>,
fire_dir: Option<impl AsRef<Path>>,
) -> Result<Self, Error> {
let legacy_sampler_config_paths = paths_matching_name(&sampler_dir, "*.json")?;
let sampler_config_paths = paths_matching_name(&sampler_dir, "*/*.json")?;
let mut project_configs = load_many(sampler_config_paths)?;
project_configs.append(&mut load_many(legacy_sampler_config_paths)?);
if let Some(fire_dir) = fire_dir {
let fire_project_paths = paths_matching_name(&fire_dir, "*/projects/*.json5")?;
let fire_component_paths = paths_matching_name(&fire_dir, "*/components.json5")?;
let fire_project_templates = load_many(fire_project_paths)?;
let fire_components = load_many::<ComponentIdInfoList>(fire_component_paths)?;
let fire_components =
fire_components.into_iter().flatten().collect::<Vec<ComponentIdInfo>>();
project_configs
.append(&mut expand_fire_projects(fire_project_templates, fire_components)?);
}
let project_configs = project_configs.into_iter().map(Arc::new).collect();
Ok(Self {
minimum_sample_rate_sec,
project_configs,
configure_reader_for_tests,
inspect_node: Mutex::new(None),
})
}
pub fn publish_inspect(self: &Arc<Self>, parent_node: &inspect::Node) {
let weak_self = Arc::downgrade(self);
lazy_static::lazy_static! {
static ref SELECTOR_STRING : inspect::StringReference = "selector".into();
static ref UPLOAD_COUNT_STRING : inspect::StringReference = "upload_count".into();
}
let mut locked_node = self.inspect_node.lock().unwrap();
*locked_node = Some(parent_node.create_lazy_child("metrics_sent", move || {
let local_self = weak_self.upgrade();
if local_self.is_none() {
return async move { Ok(inspect::Inspector::default()) }.boxed();
}
let local_self = local_self.unwrap();
let inspector = inspect::Inspector::new(
inspect::InspectorConfig::default().size(METRICS_INSPECT_SIZE_BYTES),
);
let top_node = inspector.root();
for config in local_self.project_configs.iter() {
let mut next_selector_index = 0;
let source_name = config.source_name.clone();
top_node.record_child(source_name, |file_node| {
for metric in config.metrics.iter() {
for selector in metric.selectors.iter() {
if let Some(ref selector) = selector {
file_node.record_child(
format!("{}", next_selector_index),
|selector_node| {
next_selector_index += 1;
selector_node.record_string(
&*SELECTOR_STRING,
selector.selector_string.clone(),
);
selector_node.record_uint(
&*UPLOAD_COUNT_STRING,
selector.get_upload_count(),
);
},
);
}
}
}
});
}
async move { Ok(inspector) }.boxed()
}));
}
}
#[cfg(test)]
mod tests {
use super::SamplerConfigBuilder;
use std::fs;
#[fuchsia::test]
fn parse_valid_sampler_configs() {
let dir = tempfile::tempdir().unwrap();
let load_path = dir.path();
let config_path = load_path.join("config");
fs::create_dir(&config_path).unwrap();
fs::write(config_path.join("ok.json"), r#"{
"project_id": 5,
"poll_rate_sec": 60,
"metrics": [
{
// Test comment for json5 portability.
"selector": "bootstrap/archivist:root/all_archive_accessor:inspect_batch_iterator_get_next_requests",
"metric_id": 1,
"metric_type": "Occurrence",
"event_codes": [0, 0]
}
]
}
"#).unwrap();
fs::write(config_path.join("ignored.txt"), "This file is ignored").unwrap();
fs::write(
config_path.join("also_ok.json"),
r#"{
"project_id": 5,
"poll_rate_sec": 3,
"metrics": [
{
"selector": "single_counter_test_component:root:counter",
"metric_id": 1,
"metric_type": "Occurrence",
"event_codes": [0, 0]
}
]
}
"#,
)
.unwrap();
let config = SamplerConfigBuilder::default()
.minimum_sample_rate_sec(10)
.sampler_dir(&load_path)
.load();
assert!(config.is_ok());
assert_eq!(config.unwrap().project_configs.len(), 2);
}
#[fuchsia::test]
fn parse_one_valid_one_invalid_config() {
let dir = tempfile::tempdir().unwrap();
let load_path = dir.path();
let config_path = load_path.join("config");
fs::create_dir(&config_path).unwrap();
fs::write(config_path.join("ok.json"), r#"{
"project_id": 5,
"poll_rate_sec": 60,
"metrics": [
{
// Test comment for json5 portability.
"selector": "bootstrap/archivist:root/all_archive_accessor:inspect_batch_iterator_get_next_requests",
"metric_id": 1,
"metric_type": "Occurrence",
"event_codes": [0, 0]
}
]
}
"#).unwrap();
fs::write(config_path.join("ignored.txt"), "This file is ignored").unwrap();
fs::write(
config_path.join("invalid.json"),
r#"{
"project_id": 5,
"poll_rate_sec": 3,
"invalid_field": "bad bad bad"
}
"#,
)
.unwrap();
let config = SamplerConfigBuilder::default()
.minimum_sample_rate_sec(10)
.sampler_dir(&load_path)
.load();
assert!(config.is_err());
}
#[fuchsia::test]
fn parse_optional_args() {
let dir = tempfile::tempdir().unwrap();
let load_path = dir.path();
let config_path = load_path.join("config");
fs::create_dir(&config_path).unwrap();
fs::write(config_path.join("true.json"), r#"{
"project_id": 5,
"poll_rate_sec": 60,
"metrics": [
{
// Test comment for json5 portability.
"selector": "bootstrap/archivist:root/all_archive_accessor:inspect_batch_iterator_get_next_requests",
"metric_id": 1,
"metric_type": "Occurrence",
"event_codes": [0, 0],
"upload_once": true,
}
]
}
"#).unwrap();
fs::write(
config_path.join("false.json"), r#"{
"project_id": 5,
"poll_rate_sec": 60,
"metrics": [
{
// Test comment for json5 portability.
"selector": "bootstrap/archivist:root/all_archive_accessor:inspect_batch_iterator_get_next_requests",
"metric_id": 1,
"metric_type": "Occurrence",
"event_codes": [0, 0],
"upload_once": false,
}
]
}
"#).unwrap();
let config = SamplerConfigBuilder::default()
.minimum_sample_rate_sec(10)
.sampler_dir(&load_path)
.load();
assert!(config.is_ok());
assert_eq!(config.unwrap().project_configs.len(), 2);
}
#[fuchsia::test]
fn default_customer_id() {
let dir = tempfile::tempdir().unwrap();
let load_path = dir.path();
let config_path = load_path.join("config");
fs::create_dir(&config_path).unwrap();
fs::write(config_path.join("1default.json"), r#"{
"project_id": 5,
"poll_rate_sec": 60,
"metrics": [
{
"selector": "bootstrap/archivist:root/all_archive_accessor:inspect_batch_iterator_get_next_requests",
"metric_id": 1,
"metric_type": "Occurrence",
"event_codes": [0, 0]
}
]
}
"#).unwrap();
fs::write(
config_path.join("2with_customer_id.json"),
r#"{
"customer_id": 6,
"project_id": 5,
"poll_rate_sec": 3,
"metrics": [
{
"selector": "single_counter_test_component:root:counter",
"metric_id": 1,
"metric_type": "Occurrence",
"event_codes": [0, 0]
}
]
}
"#,
)
.unwrap();
let config = SamplerConfigBuilder::default()
.minimum_sample_rate_sec(10)
.sampler_dir(&load_path)
.load();
assert!(config.is_ok());
assert_eq!(config.as_ref().unwrap().project_configs.len(), 2);
assert_eq!(config.as_ref().unwrap().project_configs[0].customer_id(), 1);
assert_eq!(config.as_ref().unwrap().project_configs[1].customer_id(), 6);
}
#[fuchsia::test]
fn fire_config_loading() {
let sampler_dir = tempfile::tempdir().unwrap();
let sampler_load_path = sampler_dir.path();
let fire_dir = tempfile::tempdir().unwrap();
let fire_load_path = fire_dir.path();
let sampler_config_path = sampler_load_path.join("config");
let fire_config_path_1 = fire_load_path.join("config1");
let fire_config_path_2 = fire_load_path.join("config2");
fs::create_dir(&sampler_config_path).unwrap();
fs::create_dir(&fire_config_path_1).unwrap();
fs::create_dir(&fire_config_path_2).unwrap();
fs::write(
sampler_config_path.join("some_name.json"),
r#"{
"project_id": 5,
"customer_id": 6,
"poll_rate_sec": 60,
"metrics": [
{
"selector": "bootstrap/archivist:root/all_archive_accessor:requests",
"metric_id": 1,
"metric_type": "Occurrence",
"event_codes": [0, 0],
"project_id": 4
}
]
}
"#,
)
.unwrap();
fs::write(
fire_config_path_1.join("components.json5"),
r#"[
{
"id": 42,
"label": "Foo_42",
"moniker": "core/foo42"
}
]"#,
)
.unwrap();
fs::write(
fire_config_path_2.join("components.json5"),
r#"[
{
id: 43,
label: "Bar_43",
moniker: "bar43",
},
]"#,
)
.unwrap();
fs::create_dir(fire_config_path_1.join("projects")).unwrap();
fs::write(
fire_config_path_1.join("projects/some_name.json5"),
r#"{
"project_id": 13,
"customer_id": 7,
"poll_rate_sec": 60,
"metrics": [
{
"selector": "{MONIKER}:root/path:leaf",
"metric_id": 1,
"metric_type": "Occurrence",
"event_codes": [1, 2]
}
]
}
"#,
)
.unwrap();
fs::write(
fire_config_path_1.join("projects/another_name.json5"),
r#"{
"project_id": 13,
"poll_rate_sec": 60,
"customer_id": 8,
"metrics": [
{
"selector": [
"{MONIKER}:root/path2:leaf2",
"foo/bar:root/{MONIKER}:leaf3",
"asdf/qwer:root/path4:pre-{MONIKER}-post",
],
"metric_id": 2,
"metric_type": "Occurrence",
}
]
}
"#,
)
.unwrap();
let config = SamplerConfigBuilder::default()
.minimum_sample_rate_sec(10)
.sampler_dir(&sampler_load_path)
.fire_dir(&fire_load_path)
.load();
assert!(config.is_ok());
let configs = &config.as_ref().unwrap().project_configs;
let config_6 = configs.iter().filter(|c| c.customer_id() == 6).next().unwrap();
let config_7 = configs.iter().filter(|c| c.customer_id() == 7).next().unwrap();
let config_8 = configs.iter().filter(|c| c.customer_id() == 8).next().unwrap();
let metric_6 =
config_6.metrics.iter().filter(|m| m.event_codes == vec![0, 0]).next().unwrap();
let metric_7_42 =
config_7.metrics.iter().filter(|m| m.event_codes == vec![42, 1, 2]).next().unwrap();
let metric_7_43 =
config_7.metrics.iter().filter(|m| m.event_codes == vec![43, 1, 2]).next().unwrap();
let metric_8_42 =
config_8.metrics.iter().filter(|m| m.event_codes == vec![42]).next().unwrap();
let metric_8_43 =
config_8.metrics.iter().filter(|m| m.event_codes == vec![43]).next().unwrap();
assert_eq!(configs.len(), 3);
assert_eq!(config_6.metrics.len(), 1);
assert_eq!(config_7.metrics.len(), 2);
assert_eq!(config_8.metrics.len(), 2);
assert_eq!(
metric_6
.selectors
.iter()
.map(|s| s.as_ref().unwrap().selector_string.to_owned())
.collect::<Vec<_>>(),
vec!["bootstrap/archivist:root/all_archive_accessor:requests"]
);
assert_eq!(metric_6.project_id, Some(4));
assert_eq!(
metric_7_42
.selectors
.iter()
.map(|s| s.as_ref().unwrap().selector_string.to_owned())
.collect::<Vec<_>>(),
vec!["core/foo42:root/path:leaf"]
);
assert_eq!(
metric_7_43
.selectors
.iter()
.map(|s| s.as_ref().unwrap().selector_string.to_owned())
.collect::<Vec<_>>(),
vec!["bar43:root/path:leaf"]
);
assert_eq!(
metric_8_42
.selectors
.iter()
.map(|s| s.as_ref().unwrap().selector_string.to_owned())
.collect::<Vec<_>>(),
vec![
"core/foo42:root/path2:leaf2",
"foo/bar:root/core\\/foo42:leaf3",
"asdf/qwer:root/path4:pre-core\\/foo42-post",
]
);
assert_eq!(
metric_8_43
.selectors
.iter()
.map(|s| s.as_ref().unwrap().selector_string.to_owned())
.collect::<Vec<_>>(),
vec![
"bar43:root/path2:leaf2",
"foo/bar:root/bar43:leaf3",
"asdf/qwer:root/path4:pre-bar43-post",
]
);
}
}