class AggregationProcedure
Defined at line 75 of file ../../third_party/cobalt/src/local_aggregation/aggregation_procedures/aggregation_procedure.h
AggregationProcedure is the abstract interface for aggregation logic.
For each local aggregation procedure, there should be an implementation of this class.
An AggregationProcedure should not have any of its own state. Its role is to update the state
within the instance of ReportAggregate it is passed in UpdateAggregate() and
MaybeGenerateObservation().
See Also:
* LocalAggregation calls UpdateAggregate() whenever AddEvent() is called.
* ObservationGenerator calls MaybeGenerateObservation() on a regular schedule (~once per hour).
Note:
* AggregationProcedures that are always hour-based should implement HourlyAggregationProcedure
instead.
Public Methods
void AggregationProcedure (const MetricDefinition & metric, const ReportDefinition & report)
Construct an AggregationProcedure
|metric| Used to extract MetricType for validating EventRecords.
|report| Used for extracting report-specific parameters.
lib::statusor::StatusOr<util::NotNullUniquePtr<AggregationProcedure>> Get (const std::string & customer_name, const std::string & project_name, const MetricDefinition & metric, const ReportDefinition & report)
AggregationProcedure::Get returns the appropriate AggregationProcedure for the given
metric/report.
lib::statusor::StatusOr<ReportAggregate *> GetReportAggregate (MetricAggregate * metric_aggregate, uint32_t report_id, CivilTimeManager * civil_time_mgr)
Get a `ReportAggregate` from `aggregate` for a `report_id`.
If the ReportAggregate doesn't exist yet, it will be created and initialized.
void UpdateAggregate (const logger::EventRecord & event_record, ReportAggregate & aggregate, uint64_t system_profile_hash, std::chrono::system_clock::time_point system_time)
UpdateAggregate takes an |event_record| and adds it to the given |aggregate| object according
to the aggregation procedure implemented by this instance of AggregationProcedure.
|event_record|: The event that needs to be added to the aggregate.
|aggregate|: A mutable ReportAggregate object.
|system_profile_hash|: The hash of the filtered system profile this event is for.
|system_time|: The system time that the event occurred at.
void MergeSystemProfileAggregates (SystemProfileAggregate & merged_aggregate, const SystemProfileAggregate & aggregate)
Merge two instances of SystemProfileAggregate according to this aggregation procedure.
The event codes and their data from |aggregate| are merged into the event codes and data in
|merged_aggregate|. Both system profile aggregates must be included in the same
AggregationPeriodBucket. Each procedure's implementation of MergeAggregateData does the work of
merging the AggregateData in each bucket.
lib::statusor::StatusOr<std::vector<ObservationAndSystemProfile>> GenerateObservations (const util::TimeInfo & time_info, ReportAggregate & aggregate)
GenerateObservations is the public interface for generating observations. It handles
reading EventCodeAggregates out of the ReportAggregate based on the provided time_info, and
passing all that information down to GenerateSingleObservation.
|time_info|: Time period for which the observations should be generated.
|aggregate|: A mutable reference to a ReportAggregate object.
Returns all the generated observations (multiple can be generated for REPORT_ALL reports),
and the system profile hash to use when generating their ObservationMetadata.
std::string DebugString ()
bool IsDaily ()
IsDaily returns true if this particular report should be treated as daily data.
void ~AggregationProcedure ()
Defined at line 82 of file ../../third_party/cobalt/src/local_aggregation/aggregation_procedures/aggregation_procedure.h
bool IsExpedited ()
IsExpedited returns true if this particular report can generate expedited observations during
the current day.
Defined at line 137 of file ../../third_party/cobalt/src/local_aggregation/aggregation_procedures/aggregation_procedure.h
util::TimeInfo GetLastTimeInfo (const ReportAggregate & aggregate)
Defined at line 143 of file ../../third_party/cobalt/src/local_aggregation/aggregation_procedures/aggregation_procedure.h
bool IsValidEventType (Event::TypeCase type)
IsValidEventType returns true if the given event type can be handled by the
AggregationProcedure.
void ObservationsCommitted (ReportAggregate & aggregate, util::TimeInfo time_info, std::optional<uint64_t> system_profile_hash)
Record that the observation generation from a previous call to GenerateObservations is done.
This will clean up any aggregate data that is no longer needed, and update the last
time info data so that future aggregation runs don't try to regenerate it.
If |system_profile_hash| is set, then it identifies the System Profile of the observations that
have been stored. If it is not set, then GenerateObservations returned no observations for the
|time_info|.
Note: This function should maintain only the minimum amount of data required to generate future
observations. If the ReportAggregate contains data points that cannot be useful for any future
observations, they should be deleted. If an hour_id or a day_index is passed to this function,
an earlier hour_id or day_index will never be passed and the data associated with that can be
deleted.
Protected Methods
void UpdateAggregateData (const logger::EventRecord & event_record, AggregateData & aggregate_data, AggregationPeriodBucket & bucket)
UpdateAggregateData takes an |event_record| and adds it to the given |aggregate_data|
object according to the aggregation procedure implemented by this instance of
AggregationProcedure.
|event_record|: The event that needs to be added to the aggregate data.
|aggregate_data|: A mutable AggregateData object.
|bucket|: A mutable AggregationPeriodBucket object containing |aggregate_data|.
void MergeAggregateData (AggregateData & merged_aggregate_data, const AggregateData & aggregate_data)
Merge two instances of the aggregate data for this procedure.
The data from |aggregate_data| is merged into the data in |merged_aggregate_data|. Both
AggregateData objects must be part of the same AggregationPeriodBucket for the fields on the
bucket to be preserved accurately.
lib::statusor::StatusOr<std::unique_ptr<Observation>> GenerateSingleObservation (const std::vector<AggregateDataToGenerate> & buckets, const std::set<std::vector<uint32_t>> & event_vectors, const util::TimeInfo & time_info)
GenerateSingleObservation generates an observation for the given report at the given time.
A non-ok Status means there was an error at some point while generating observations. An ok
status means that the observation generation process completed successfully. The observation
may be null if there was nothing to send for this aggregate in this time window.
|buckets|: A list of AggregateDataToGenerate objects containing the AggregationData objects.
|event_vectors|: The event vectors which should be included in the observation.
|time_info|: The time for which the observation is being generated.
Note: This method will always be called with increasing time_info values. If the report is not
expedited, the values will always be strictly increasing. For expedited reports, the last
(current day's) time_info can be used to make multiple calls during the day, so the
implementation must be able to determine if an observation has already been generated for the
data and not regenerate it.
std::map<uint64_t, std::vector<AggregateDataToGenerate>> GetAggregateDataToGenerate (const util::TimeInfo & time_info, ReportAggregate & aggregate)
Accumulates the data that needs to be checked for observation generation for time_info.
The return value is a map from the system_profile_hash, to the data to use to generate the
observation for that system profile.
std::set<std::vector<uint32_t>> SelectEventVectorsForObservation (const std::vector<AggregateDataToGenerate> & buckets)
Returns a set containing the first |event_vector_buffer_max_| event vectors which were logged
for |aggregates|. The |buckets| should be ordered from earliest to latest aggregation period.
ReportDefinition::ReportType report_type ()
Defined at line 205 of file ../../third_party/cobalt/src/local_aggregation/aggregation_procedures/aggregation_procedure.h
MetricDefinition::MetricType metric_type ()
Defined at line 206 of file ../../third_party/cobalt/src/local_aggregation/aggregation_procedures/aggregation_procedure.h
void SetEventVectorBufferMax (uint64_t event_vector_buffer_max)
Defined at line 219 of file ../../third_party/cobalt/src/local_aggregation/aggregation_procedures/aggregation_procedure.h