persistence/scheduler.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::fetcher::{FetchCommand, Fetcher};
use fuchsia_async::{self as fasync, TaskGroup};
use fuchsia_sync::Mutex;
use persistence_config::{Config, ServiceName, Tag, TagConfig};
use std::collections::HashMap;
use std::sync::Arc;
// This contains the logic to decide which tags to fetch at what times. It contains the state of
// each tag (when last fetched, whether currently queued). When a request arrives via FIDL, it's
// sent here and results in requests queued to the Fetcher.
#[derive(Clone)]
pub(crate) struct Scheduler {
// This is a global lock. Scheduler only does schedule() which is synchronous and quick.
state: Arc<Mutex<State>>,
}
struct State {
fetcher: Fetcher,
services: HashMap<ServiceName, HashMap<Tag, TagState>>,
tasks: TaskGroup,
}
struct TagState {
backoff: zx::MonotonicDuration,
state: FetchState,
last_fetched: zx::MonotonicInstant,
}
impl Scheduler {
pub(crate) fn new(fetcher: Fetcher, config: &Config) -> Self {
let mut services = HashMap::new();
for (service, tags) in config {
let mut tag_states = HashMap::new();
for (tag, tag_config) in tags {
let TagConfig { min_seconds_between_fetch, .. } = tag_config;
let backoff = zx::MonotonicDuration::from_seconds(*min_seconds_between_fetch);
let tag_state = TagState {
backoff,
state: FetchState::Idle,
last_fetched: zx::MonotonicInstant::INFINITE_PAST,
};
tag_states.insert(tag.clone(), tag_state);
}
services.insert(service.clone(), tag_states);
}
let state = State { fetcher, services, tasks: TaskGroup::new() };
Scheduler { state: Arc::new(Mutex::new(state)) }
}
/// Gets a service name and a list of valid tags, and queues any fetches that are not already
/// pending. Updates the last-fetched time on any tag it queues, setting it equal to the later
/// of the current time and the time the fetch becomes possible.
pub(crate) fn schedule(&self, service: &ServiceName, tags: Vec<Tag>) {
// Every tag we process should use the same Now
let now = zx::MonotonicInstant::get();
let mut state = self.state.lock();
let Some(service_info) = state.services.get_mut(service) else {
return;
};
let mut now_tags = vec![];
let mut later_tags: Vec<(zx::MonotonicInstant, Tag)> = vec![];
for tag in tags {
let Some(tag_state) = service_info.get_mut(&tag) else {
return;
};
if matches!(tag_state.state, FetchState::Pending) {
continue;
}
if tag_state.last_fetched + tag_state.backoff <= now {
now_tags.push(tag);
tag_state.last_fetched = now;
} else {
let next_fetch = tag_state.last_fetched + tag_state.backoff;
tag_state.last_fetched = next_fetch;
tag_state.state = FetchState::Pending;
later_tags.push((next_fetch, tag));
}
}
if !now_tags.is_empty() {
state.fetcher.send(FetchCommand { service: service.clone(), tags: now_tags });
}
// later_tags may not all be fetchable at the same time. Batch the ones that are.
later_tags.sort_by(|a, b| a.0.cmp(&b.0));
while !later_tags.is_empty() {
// This is N^2 but N will be too small to matter.
let first_time = later_tags[0].0;
let mut first_tags = vec![];
let mut remaining_tags = vec![];
for (next_fetch, tag) in later_tags {
if next_fetch == first_time {
first_tags.push(tag);
} else {
remaining_tags.push((next_fetch, tag));
}
}
later_tags = remaining_tags;
self.enqueue(
&mut state,
first_time,
FetchCommand { service: service.clone(), tags: first_tags },
);
}
}
fn enqueue(&self, state: &mut State, time: zx::MonotonicInstant, command: FetchCommand) {
let this = self.clone();
let mut fetcher = state.fetcher.clone();
state.tasks.spawn(async move {
fasync::Timer::new(time).await;
{
let mut state = this.state.lock();
let Some(tag_states) = state.services.get_mut(&command.service) else {
return;
};
for tag in command.tags.iter() {
tag_states.get_mut(tag).unwrap().state = FetchState::Idle;
}
}
fetcher.send(command);
});
}
}
/// FetchState tells whether a tag is currently waiting to be dispatched or not. If it is, then
/// another request to fetch that tag should cause no change. If it's not waiting, then it can
/// either be fetched immediately (in which case its state stays Idle, but the last-fetched time
/// will be updated to Now) or it will be queued (in which case its state is Pending and its
/// last-fetched time will be set forward to the time it's going to be fetched).
enum FetchState {
Pending,
Idle,
}