1use fuchsia_sync::{Mutex, RwLock};
11use rand::Rng;
12use rand::distr::Distribution;
13use rand::distr::weighted::WeightedIndex;
14use rand::seq::IndexedRandom;
15use std::fs::File;
16use std::io::ErrorKind;
17use std::os::unix::fs::FileExt;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21#[derive(Default)]
22pub struct Stressor {
23 name_counter: AtomicU64,
24 all_files: RwLock<Vec<Arc<FileState>>>,
25 open_files: RwLock<Vec<Arc<std::fs::File>>>,
26 op_stats: Mutex<[u64; NUM_OPS]>,
27}
28
29#[derive(Eq, PartialEq)]
30struct FileState {
31 path: String,
32}
33
34const OPEN_FILE: usize = 0;
35const CLOSE_FILE: usize = 1;
36const CREATE_FILE: usize = 2;
37const DELETE_FILE: usize = 3;
38const READ: usize = 4;
39const WRITE: usize = 5;
40const TRUNCATE: usize = 6;
41
42const NUM_OPS: usize = 7;
43
44impl Stressor {
45 pub fn new() -> Arc<Self> {
46 let mut files = Vec::new();
48 let mut max_counter = 0;
49 for entry in std::fs::read_dir("/data").unwrap() {
50 let entry = entry.unwrap();
51 let path = entry.path();
52 if let Some(counter) = path
53 .strip_prefix("/data/")
54 .ok()
55 .and_then(|p| p.to_str())
56 .and_then(|s| s.parse().ok())
57 {
58 if counter > max_counter {
59 max_counter = counter;
60 }
61 }
62 files.push(Arc::new(FileState { path: path.to_str().unwrap().to_string() }));
63 }
64
65 Arc::new(Stressor {
66 name_counter: AtomicU64::new(max_counter + 1),
67 all_files: RwLock::new(files),
68 ..Stressor::default()
69 })
70 }
71
72 fn get_weights(&self) -> [f64; NUM_OPS] {
74 let all_file_count = self.all_files.read().len() as f64;
75 let open_file_count = self.open_files.read().len() as f64;
76 let if_open = |x| if open_file_count > 0.0 { x } else { 0.0 };
77 [
78 1.0 / (open_file_count + 1.0) * all_file_count,
79 open_file_count * 0.1,
80 2.0 / (all_file_count + 1.0),
81 all_file_count * 0.005,
82 if_open(1000.0),
83 if_open(1000.0),
84 if_open(1000.0),
85 ]
86 }
87
88 pub fn run(self: &Arc<Self>, num_threads: usize) {
90 log::info!(
91 "Running stressor, found {} files, counter: {}",
92 self.all_files.read().len(),
93 self.name_counter.load(Ordering::Relaxed),
94 );
95 for _ in 0..num_threads {
96 let this = self.clone();
97 std::thread::spawn(move || this.worker());
98 }
99
100 loop {
101 std::thread::sleep(std::time::Duration::from_secs(10));
102 let all_file_count = self.all_files.read().len();
103 let open_file_count = self.open_files.read().len();
104 log::info!(
105 "{} files, {} open, weights: {:?}, counts: {:?}",
106 all_file_count,
107 open_file_count,
108 self.get_weights(),
109 self.op_stats.lock()
110 );
111 }
112 }
113
114 fn worker(&self) {
116 let mut rng = rand::rng();
117 let mut buf = Vec::new();
118 loop {
119 let weights: [f64; NUM_OPS] = self.get_weights();
120 let op = WeightedIndex::new(weights).unwrap().sample(&mut rng);
121 match op {
122 OPEN_FILE => {
123 let Some(file) = self.all_files.read().choose(&mut rng).cloned() else {
124 continue;
125 };
126 match File::options().read(true).write(true).open(&file.path) {
127 Ok(f) => self.open_files.write().push(Arc::new(f)),
128 Err(e) => match e.kind() {
129 ErrorKind::NotFound => {}
130 e => {
131 panic!("open file failed with error {e:?}");
132 }
133 },
134 }
135 }
136 CLOSE_FILE => {
137 let _file = {
138 let mut open_files = self.open_files.write();
139 let num = open_files.len();
140 if num == 0 {
141 continue;
142 }
143 open_files.remove(rng.random_range(0..num))
144 };
145 }
146 CREATE_FILE => {
147 let path =
148 format!("/data/{}", self.name_counter.fetch_add(1, Ordering::Relaxed));
149 match File::options().create(true).read(true).write(true).open(&path) {
150 Ok(file) => {
151 let file_state = Arc::new(FileState { path });
152 self.all_files.write().push(file_state.clone());
153 self.open_files.write().push(Arc::new(file));
154 }
155 Err(error) => {
156 panic!("Failed to create file: {error:?}");
157 }
158 };
159 }
160 DELETE_FILE => {
161 let file = {
162 let mut all_files = self.all_files.write();
163 if all_files.is_empty() {
164 continue;
165 }
166 let num = all_files.len();
167 all_files.remove(rng.random_range(0..num))
168 };
169 std::fs::remove_file(&file.path).unwrap();
170 }
171 READ => {
172 let Some(file) = self.open_files.read().choose(&mut rng).cloned() else {
173 continue;
174 };
175 let read_offset = rng.random_range(0..100_000) as u64;
176 let read_len = (-rng.random::<f64>().ln() * 100_000.0) as u64;
177 buf.resize(read_len as usize, 1);
178 file.read_at(&mut buf, read_offset).unwrap();
179 }
180 WRITE => {
181 let Some(file) = self.open_files.read().choose(&mut rng).cloned() else {
182 continue;
183 };
184 let write_len = (-rng.random::<f64>().ln() * 10_000.0) as u64;
185 let write_offset = rng.random_range(0..100_000) as u64;
186 buf.resize(write_len as usize, 1);
187 file.write_at(&buf, write_offset).unwrap();
188 }
189 TRUNCATE => {
190 let Some(file) = self.open_files.read().choose(&mut rng).cloned() else {
191 continue;
192 };
193 file.set_len(rng.random_range(0..100_000)).unwrap();
194 }
195 _ => unreachable!(),
196 }
197 self.op_stats.lock()[op] += 1;
198 }
199 }
200}