settings/
job.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

//! Job Handling Support
//!
//! # Summary
//!
//! [Jobs] are basic units of work that interfaces in the setting service can specify. In addition
//! to the workload, [Job] definitions also capture information about how the work should be
//! handled. For example, a Job can specify that it would like to run in sequence within a set of
//! similar [Jobs]. This behavior is captured by the [Job]'s execution [Type](execution::Type).
//!
//! Sources are streams that provide jobs from a given source. The lifetime of [Jobs] produced
//! by a source are bound to the source's lifetime. The end of a source stream will lead to any
//! in-flight and pending jobs being cancelled.
//!
//! Job Manager is responsible for managing sources and the jobs they produce. The manager
//! associates and maintains any supporting data for jobs, such as caches.
//!
//! [Jobs]: Job
use crate::service::message;
use crate::{payload_convert, trace};

use core::fmt::{Debug, Formatter};
use core::pin::Pin;
use futures::channel::oneshot;
use futures::lock::Mutex;
use futures::stream::Stream;
use std::any::TypeId;
use std::collections::HashMap;
use std::future::Future;
use std::rc::Rc;

pub mod manager;
pub mod source;

payload_convert!(Job, Payload);

/// [StoreHandleMapping] represents the mapping from a [Job]'s [Signature] to the [data::Data]
/// store. This store is shared by all [Jobs] with the same [Signature].
///
/// [Jobs]: Job
pub(super) type StoreHandleMapping = HashMap<Signature, data::StoreHandle>;
type PinStream<T> = Pin<Box<dyn Stream<Item = T>>>;
type SourceStreamHandle = Rc<Mutex<Option<PinStream<Result<Job, source::Error>>>>>;

/// The data payload that can be sent to the [Job Manager](crate::job::manager::Manager).
#[derive(Clone)]
pub enum Payload {
    /// `Source` represents a new source of [Jobs](Job).
    Source(SourceStreamHandle),
}

impl Debug for Payload {
    fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
        write!(f, "Job Payload")
    }
}

impl PartialEq for Payload {
    fn eq(&self, _other: &Self) -> bool {
        false
    }
}

pub mod data {
    //! The data mod provides the components for interacting with information that can be stored and
    //! retrieved by a [Job](super::Job) during its execution. [Keys](Key) provide a way to address
    //! this data while [Data] defines the type of data that can be stored per entry.

    use crate::base::SettingInfo;
    use futures::lock::Mutex;
    use std::collections::HashMap;
    use std::rc::Rc;

    /// A shared handle to the [Data] mapping.
    pub type StoreHandle = Rc<Mutex<HashMap<Key, Data>>>;

    #[derive(Clone, PartialEq, Eq, Hash)]
    pub enum Key {
        Identifier(&'static str),
        #[cfg(test)]
        TestInteger(usize),
    }

    #[derive(Clone, PartialEq)]
    pub enum Data {
        SettingInfo(SettingInfo),
        #[cfg(test)]
        TestData(usize),
    }
}

pub mod work {
    use super::{data, Signature};
    use crate::service::message;
    use async_trait::async_trait;
    use fuchsia_trace as ftrace;

    pub enum Load {
        /// [Sequential] loads are run in order after [Loads](Load) from [Jobs](crate::job::Job)
        /// of the same [Signature](crate::job::Signature) that preceded them. These [Loads](Load)
        /// share a common data store, which can be used to share information.
        Sequential(Box<dyn Sequential>, Signature),
        /// [Independent] loads are run as soon as there is availability to run as dictated by the
        /// containing [Job's](crate::job::Job) handler.
        Independent(Box<dyn Independent>),
    }

    /// Possible error conditions that can be encountered during work execution.
    pub enum Error {
        /// The work was canceled.
        Canceled,
    }

    impl Load {
        /// Executes the contained workload, providing the individualized parameters based on type.
        /// This function is asynchronous and is meant to be waited upon for workload completion.
        /// Workloads are expected to wait and complete all work within the scope of this execution.
        /// Therefore, invoking code can safely presume all work has completed after this function
        /// returns.
        pub(super) async fn execute(
            self,
            messenger: message::Messenger,
            store: Option<data::StoreHandle>,
            id: ftrace::Id,
        ) -> Result<(), Error> {
            match self {
                Load::Sequential(load, _) => {
                    load.execute(
                        messenger,
                        store.expect("all sequential loads should have store"),
                        id,
                    )
                    .await
                }
                Load::Independent(load) => {
                    load.execute(messenger, id).await;
                    Ok(())
                }
            }
        }
    }

    impl std::fmt::Debug for Load {
        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
            match self {
                Load::Sequential(_, _) => f.debug_tuple("Sequential"),
                Load::Independent(_) => f.debug_tuple("Independent"),
            }
            .field(&"..")
            .finish()
        }
    }

    #[async_trait(?Send)]
    pub trait Sequential {
        /// Called when the [Job](super::Job) processing is ready for the encapsulated
        /// [work::Load](super::work::Load) be executed. The provided [StoreHandle](data::StoreHandle)
        /// is specific to the parent [Job](super::Job) group.
        async fn execute(
            self: Box<Self>,
            messenger: message::Messenger,
            store: data::StoreHandle,
            id: ftrace::Id,
        ) -> Result<(), Error>;
    }

    #[async_trait(?Send)]
    pub trait Independent {
        /// Called when a [work::Load](super::work::Load) should run. All workload specific logic should
        /// be encompassed in this method.
        async fn execute(self: Box<Self>, messenger: message::Messenger, id: ftrace::Id);
    }
}

#[derive(PartialEq, Eq, Copy, Clone, Debug, Hash)]
enum Either<A, B> {
    A(A),
    B(B),
}

/// An identifier specified by [Jobs](Job) to group related workflows. This is useful for
/// [work::Loads](work::Load) that need to be run sequentially. The [Signature] is used by the job
/// infrastructure to associate resources such as caches.
#[derive(PartialEq, Copy, Clone, Debug, Eq, Hash)]
pub struct Signature {
    key: Either<TypeId, (TypeId, u64)>,
}

impl Signature {
    /// Constructs a new [Signature]. The key provided will group the associated [Job] with other
    /// [Jobs](Job) of the same key. The association is scoped to other [Jobs](Job) in the same
    /// parent source.
    pub(crate) fn new<T>() -> Self
    where
        T: 'static + ?Sized,
    {
        Self { key: Either::A(TypeId::of::<T>()) }
    }

    pub(crate) fn with<T>(key: u64) -> Self
    where
        T: 'static + ?Sized,
    {
        Self { key: Either::B((TypeId::of::<T>(), key)) }
    }
}

/// A [Job] is a simple data container that associates a [work::Load] with an [execution::Type]
/// along with metadata, such as the creation time.
#[derive(Debug)]
pub struct Job {
    /// The [work::Load] to be run.
    workload: work::Load,
    /// The [execution::Type] determining how the [work::Load] will be run.
    execution_type: execution::Type,
    /// The trigger that can be used to cancel this job. Not all jobs are cancelable.
    cancelation_tx: Option<oneshot::Sender<()>>,
}

impl Job {
    pub(crate) fn new(workload: work::Load) -> Self {
        let execution_type = match &workload {
            work::Load::Sequential(_, signature) => execution::Type::Sequential(*signature),
            _ => execution::Type::Independent,
        };

        Self { workload, execution_type, cancelation_tx: None }
    }

    pub(crate) fn new_with_cancellation(
        workload: work::Load,
        cancelation_tx: oneshot::Sender<()>,
    ) -> Self {
        let execution_type = match &workload {
            work::Load::Sequential(_, signature) => execution::Type::Sequential(*signature),
            _ => execution::Type::Independent,
        };

        Self { workload, execution_type, cancelation_tx: Some(cancelation_tx) }
    }

    #[cfg(test)]
    pub(crate) fn workload(&self) -> &work::Load {
        &self.workload
    }

    #[cfg(test)]
    pub(crate) fn execution_type(&self) -> execution::Type {
        self.execution_type
    }
}

/// [Id] provides a unique identifier for a job within its parent space. Unlike
/// [Signatures](Signature), All [Job Ids](Id) will be unique per [Job]. [Ids](Id) should never be
/// directly constructed. An [IdGenerator] should be used instead.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub(super) struct Id {
    _identifier: usize,
}

impl Id {
    fn new(identifier: usize) -> Self {
        Self { _identifier: identifier }
    }
}

/// [`IdGenerator`] is responsible for generating unique [Ids](Id).
pub(super) struct IdGenerator {
    next_identifier: usize,
}

impl IdGenerator {
    pub(super) fn new() -> Self {
        Self { next_identifier: 0 }
    }

    /// Produces a [`Id`] that is unique from any [`Id`] that has or will be generated by this
    /// [`IdGenerator`] instance.
    pub(super) fn generate(&mut self) -> Id {
        let return_id = Id::new(self.next_identifier);
        self.next_identifier += 1;

        return_id
    }
}

/// An enumeration of stages a [Job] can be in.
enum State {
    /// The workload associated with the [Job] has not been executed yet.
    Ready(Job),
    /// The workload is executing.
    Executing,
    /// The workload execution has completed.
    Executed,
    /// The workload was canceled before it completed.
    Canceled,
}

/// [Info] is used to capture details about a [Job] once it has been accepted by an entity that will
/// process it. This includes an assigned [Id] and a recording at what time it was accepted.
struct Info {
    id: Id,
    state: State,
    execution_type: execution::Type,
    cancelation_tx: Option<oneshot::Sender<()>>,
}

impl Info {
    fn new(id: Id, mut job: Job) -> Self {
        let execution_type = job.execution_type;
        let cancelation_tx = job.cancelation_tx.take();
        Self { id, state: State::Ready(job), execution_type, cancelation_tx }
    }

    /// Retrieves the [execution::Type] of the underlying [Job].
    fn get_execution_type(&self) -> &execution::Type {
        &self.execution_type
    }

    /// Prepares the components necessary for a [Job] to execute and then returns a future to
    /// execute the [Job] workload with them. These components include a messenger for communicating
    /// with the system and the store associated with the [Job's](Job) group if applicable.
    async fn prepare_execution<F: FnOnce(Self)>(
        mut self,
        delegate: &mut message::Delegate,
        stores: &mut StoreHandleMapping,
        callback: F,
    ) -> impl Future<Output = ()> {
        // Create a messenger for the workload to communicate with the rest of the setting
        // service.
        let messenger = delegate
            .create(message::MessengerType::Unbound)
            .await
            .expect("messenger should be available")
            .0;

        let store = self
            .execution_type
            .get_signature()
            .map(|signature| stores.entry(*signature).or_default().clone());

        async move {
            let id = fuchsia_trace::Id::new();
            trace!(id, c"job execution");
            let mut state = State::Executing;
            std::mem::swap(&mut state, &mut self.state);

            if let State::Ready(job) = state {
                self.state = if let Err(work::Error::Canceled) =
                    job.workload.execute(messenger, store, id).await
                {
                    State::Canceled
                } else {
                    State::Executed
                };
                callback(self);
            } else {
                panic!("job not in the ready state");
            }
        }
    }
}

pub(super) mod execution {
    use super::Signature;
    use crate::job;
    use futures::channel::oneshot;
    use std::collections::{HashMap, VecDeque};

    /// The workload types of a [job::Job]. This enumeration is used to define how a [job::Job] will
    /// be treated in relation to other jobs from the same source.
    #[derive(PartialEq, Clone, Copy, Debug, Eq, Hash)]
    pub enum Type {
        /// Independent jobs are executed in isolation from other [Jobs](job::Job). Some
        /// functionality is unavailable for Independent jobs, such as caches.
        Independent,
        /// Sequential [Jobs](job::Job) wait until all pre-existing [Jobs](job::Job) of the same
        /// [Signature] are completed.
        Sequential(Signature),
    }

    impl Type {
        pub(super) fn get_signature(&self) -> Option<&Signature> {
            if let Type::Sequential(signature) = self {
                Some(signature)
            } else {
                None
            }
        }
    }

    #[derive(thiserror::Error, Debug, Clone, Copy)]
    pub(super) enum GroupError {
        #[error("The group is closed, so no new jobs can be added")]
        Closed,
    }

    /// A collection of [Jobs](job::Job) which have matching execution types. [Groups](Group)
    /// determine how similar [Jobs](job::Job) are executed.
    pub(super) struct Group {
        group_type: Type,
        active: HashMap<job::Id, Option<oneshot::Sender<()>>>,
        pending: VecDeque<job::Info>,
        canceled: bool,
    }

    impl Group {
        /// Creates a new [Group] based on the execution [Type].
        pub(super) fn new(group_type: Type) -> Self {
            Self { group_type, active: HashMap::new(), pending: VecDeque::new(), canceled: false }
        }

        /// Returns whether any [Jobs](job::Job) are currently active. Pending [Jobs](job::Job) do
        /// not count towards this total.
        pub(super) fn is_active(&self) -> bool {
            !self.active.is_empty()
        }

        /// Returns whether there are any [Jobs](job::Job) waiting to be executed.
        pub(super) fn has_available_jobs(&self) -> bool {
            if self.pending.is_empty() {
                return false;
            }

            match self.group_type {
                Type::Independent => true,
                Type::Sequential(_) => self.active.is_empty(),
            }
        }

        pub(super) fn add(&mut self, job_info: job::Info) -> Result<(), GroupError> {
            if self.canceled {
                Err(GroupError::Closed)
            } else {
                self.pending.push_back(job_info);
                Ok(())
            }
        }

        /// Invoked by [Job](super::Job) processing code to retrieve the next [Job](super::Job) to
        /// run from this group. If there are no [Jobs](super::Job) ready for execution, None is
        /// return. Otherwise, the selected [Job](super::Job) is added to the list of active
        /// [Jobs](super::Job) for the group and handed back to the caller.
        pub(super) fn promote_next_to_active(&mut self) -> Option<job::Info> {
            if !self.has_available_jobs() {
                return None;
            }

            let mut active_job = self.pending.pop_front();

            if let Some(ref mut job) = active_job {
                let _ = self.active.insert(job.id, job.cancelation_tx.take());
            }

            active_job
        }

        pub(super) fn complete(&mut self, job_info: job::Info) {
            let _ = self.active.remove(&job_info.id);
        }

        pub(super) fn cancel(&mut self) {
            self.canceled = true;
            self.pending.clear();
            for (_, cancelation_tx) in self.active.iter_mut() {
                if let Some(cancelation_tx) = cancelation_tx.take() {
                    let _ = cancelation_tx.send(());
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::message::base::MessengerType;
    use crate::service::test::Payload;
    use crate::service::MessageHub;
    use crate::tests::scaffold::workload::Workload;

    use assert_matches::assert_matches;
    use rand::Rng;

    #[fuchsia::test]
    fn test_id_generation() {
        let mut generator = IdGenerator::new();
        // Ensure generator creates subsequent ids that don't match.
        assert!(generator.generate() != generator.generate());
    }

    #[fuchsia::test(allow_stalls = false)]
    async fn test_job_functionality() {
        // Create delegate for communication between components.
        let message_hub_delegate = MessageHub::create_hub();

        // Create a top-level receptor to receive communication from the workload.
        let mut receptor = message_hub_delegate
            .create(MessengerType::Unbound)
            .await
            .expect("should create receptor")
            .1;

        // Create a messenger to send communication from the workload.
        let messenger = message_hub_delegate
            .create(MessengerType::Unbound)
            .await
            .expect("should create messenger")
            .0;

        let mut rng = rand::thread_rng();
        // The value expected to be conveyed from workload to receptor.
        let val = rng.gen();

        // Create job from workload scaffolding.
        let job = Job::new(work::Load::Independent(Workload::new(
            Payload::Integer(val),
            receptor.get_signature(),
        )));

        let _ = job
            .workload
            .execute(messenger, Some(Rc::new(Mutex::new(HashMap::new()))), 0.into())
            .await;

        // Confirm received value matches the value sent from workload.
        assert_matches!(
            receptor.next_of::<Payload>().await.expect("should return result, not error").0,
            Payload::Integer(value) if value == val);
    }
}