stress_test/counter.rs
1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_async::Task;
6use futures::channel::mpsc;
7use futures::StreamExt;
8use log::debug;
9use std::collections::HashMap;
10
11/// An unbounded mpsc channel connecting each runner to the counter thread.
12///
13/// CounterTx is cloned and given to all runners. Upon a successful operation, the runner
14/// will send their actor's name over the channel.
15///
16/// The counter thread receives the actors name and updates the current operation count.
17/// This ensures that runners are not blocked.
18pub type CounterTx = mpsc::UnboundedSender<String>;
19
20/// Starts a new task that maintains a count of all successful operations
21/// This task will terminate when the target number of operations has been hit.
22///
23/// Returns the counter task and a CounterTx.
24pub fn start_counter(target: u64) -> (Task<()>, CounterTx) {
25 let (tx, mut rx) = mpsc::unbounded();
26 let task = Task::spawn(async move {
27 // Keep track of global count + individual actor contributions
28 let mut count_map: HashMap<String, u64> = HashMap::new();
29 let mut total = 0;
30
31 // Run this task until the count has been met
32 while total < target {
33 // Wait for an actor to finish an operation
34 let key = rx.next().await.unwrap();
35
36 // Update the actor's contribution
37 if let Some(value) = count_map.get_mut(&key) {
38 *value += 1;
39 } else {
40 count_map.insert(key, 1);
41 }
42
43 // Update global count
44 total += 1;
45
46 debug!(total:%, count_map:?; "Counters");
47 }
48 });
49 (task, tx)
50}