persistence/
scheduler.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use crate::fetcher::{FetchCommand, Fetcher};
use fuchsia_async::{self as fasync, TaskGroup};
use fuchsia_sync::Mutex;

use persistence_config::{Config, ServiceName, Tag, TagConfig};
use std::collections::HashMap;
use std::sync::Arc;

// This contains the logic to decide which tags to fetch at what times. It contains the state of
// each tag (when last fetched, whether currently queued). When a request arrives via FIDL, it's
// sent here and results in requests queued to the Fetcher.

#[derive(Clone)]
pub(crate) struct Scheduler {
    // This is a global lock. Scheduler only does schedule() which is synchronous and quick.
    state: Arc<Mutex<State>>,
}

struct State {
    fetcher: Fetcher,
    services: HashMap<ServiceName, HashMap<Tag, TagState>>,
    tasks: TaskGroup,
}

struct TagState {
    backoff: zx::MonotonicDuration,
    state: FetchState,
    last_fetched: zx::MonotonicInstant,
}

impl Scheduler {
    pub(crate) fn new(fetcher: Fetcher, config: &Config) -> Self {
        let mut services = HashMap::new();
        for (service, tags) in config {
            let mut tag_states = HashMap::new();
            for (tag, tag_config) in tags {
                let TagConfig { min_seconds_between_fetch, .. } = tag_config;
                let backoff = zx::MonotonicDuration::from_seconds(*min_seconds_between_fetch);
                let tag_state = TagState {
                    backoff,
                    state: FetchState::Idle,
                    last_fetched: zx::MonotonicInstant::INFINITE_PAST,
                };
                tag_states.insert(tag.clone(), tag_state);
            }
            services.insert(service.clone(), tag_states);
        }
        let state = State { fetcher, services, tasks: TaskGroup::new() };
        Scheduler { state: Arc::new(Mutex::new(state)) }
    }

    /// Gets a service name and a list of valid tags, and queues any fetches that are not already
    /// pending. Updates the last-fetched time on any tag it queues, setting it equal to the later
    /// of the current time and the time the fetch becomes possible.
    pub(crate) fn schedule(&self, service: &ServiceName, tags: Vec<Tag>) {
        // Every tag we process should use the same Now
        let now = zx::MonotonicInstant::get();
        let mut state = self.state.lock();
        let Some(service_info) = state.services.get_mut(service) else {
            return;
        };

        let mut now_tags = vec![];
        let mut later_tags: Vec<(zx::MonotonicInstant, Tag)> = vec![];
        for tag in tags {
            let Some(tag_state) = service_info.get_mut(&tag) else {
                return;
            };
            if matches!(tag_state.state, FetchState::Pending) {
                continue;
            }
            if tag_state.last_fetched + tag_state.backoff <= now {
                now_tags.push(tag);
                tag_state.last_fetched = now;
            } else {
                let next_fetch = tag_state.last_fetched + tag_state.backoff;
                tag_state.last_fetched = next_fetch;
                tag_state.state = FetchState::Pending;
                later_tags.push((next_fetch, tag));
            }
        }
        if !now_tags.is_empty() {
            state.fetcher.send(FetchCommand { service: service.clone(), tags: now_tags });
        }
        // later_tags may not all be fetchable at the same time. Batch the ones that are.
        later_tags.sort_by(|a, b| a.0.cmp(&b.0));
        while !later_tags.is_empty() {
            // This is N^2 but N will be too small to matter.
            let first_time = later_tags[0].0;
            let mut first_tags = vec![];
            let mut remaining_tags = vec![];
            for (next_fetch, tag) in later_tags {
                if next_fetch == first_time {
                    first_tags.push(tag);
                } else {
                    remaining_tags.push((next_fetch, tag));
                }
            }
            later_tags = remaining_tags;
            self.enqueue(
                &mut state,
                first_time,
                FetchCommand { service: service.clone(), tags: first_tags },
            );
        }
    }

    fn enqueue(&self, state: &mut State, time: zx::MonotonicInstant, command: FetchCommand) {
        let this = self.clone();
        let mut fetcher = state.fetcher.clone();
        state.tasks.spawn(async move {
            fasync::Timer::new(time).await;
            {
                let mut state = this.state.lock();
                let Some(tag_states) = state.services.get_mut(&command.service) else {
                    return;
                };
                for tag in command.tags.iter() {
                    tag_states.get_mut(tag).unwrap().state = FetchState::Idle;
                }
            }
            fetcher.send(command);
        });
    }
}

/// FetchState tells whether a tag is currently waiting to be dispatched or not. If it is, then
/// another request to fetch that tag should cause no change. If it's not waiting, then it can
/// either be fetched immediately (in which case its state stays Idle, but the last-fetched time
/// will be updated to Now) or it will be queued (in which case its state is Pending and its
/// last-fetched time will be set forward to the time it's going to be fetched).
enum FetchState {
    Pending,
    Idle,
}