stress_test/
counter.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_async::Task;
6use futures::channel::mpsc;
7use futures::StreamExt;
8use log::debug;
9use std::collections::HashMap;
10
11/// An unbounded mpsc channel connecting each runner to the counter thread.
12///
13/// CounterTx is cloned and given to all runners. Upon a successful operation, the runner
14/// will send their actor's name over the channel.
15///
16/// The counter thread receives the actors name and updates the current operation count.
17/// This ensures that runners are not blocked.
18pub type CounterTx = mpsc::UnboundedSender<String>;
19
20/// Starts a new task that maintains a count of all successful operations
21/// This task will terminate when the target number of operations has been hit.
22///
23/// Returns the counter task and a CounterTx.
24pub fn start_counter(target: u64) -> (Task<()>, CounterTx) {
25    let (tx, mut rx) = mpsc::unbounded();
26    let task = Task::spawn(async move {
27        // Keep track of global count + individual actor contributions
28        let mut count_map: HashMap<String, u64> = HashMap::new();
29        let mut total = 0;
30
31        // Run this task until the count has been met
32        while total < target {
33            // Wait for an actor to finish an operation
34            let key = rx.next().await.unwrap();
35
36            // Update the actor's contribution
37            if let Some(value) = count_map.get_mut(&key) {
38                *value += 1;
39            } else {
40                count_map.insert(key, 1);
41            }
42
43            // Update global count
44            total += 1;
45
46            debug!(total:%, count_map:?; "Counters");
47        }
48    });
49    (task, tx)
50}