stressor_lib/aggressive.rs
1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Starts the 'aggressive' stressor.
6//! This variant performs open-read-close, open-append-close, and unlink-open-append-close
7//! operations. Note that 'truncate' is not exercised.
8//! This version also silently ignores errors such as out of space.
9
10use fidl_fuchsia_io as fio;
11use fuchsia_async::MonotonicInstant;
12use fuchsia_sync::Mutex;
13use rand::distr::weighted::WeightedIndex;
14use rand::prelude::*;
15use std::fs::File;
16use std::io::ErrorKind;
17use std::os::unix::fs::FileExt;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21struct Inner {
22 /// Used to round-robin across NUM_FILES in order.
23 file_counter: usize,
24 /// Maps from file_counter to file index. This allows us to shuffle the tail
25 /// as we round-robin without touching the files that are in the dirent cache.
26 file_map: [u64; NUM_FILES],
27}
28
29impl Inner {
30 /// Returns the next file to use, avoiding the most recent DIRENT_CACHE_LIMIT
31 /// files returned but shuffling the remainder.
32 pub fn next_file_num(&mut self) -> u64 {
33 let mut rng = rand::rng();
34 if self.file_counter == 0 {
35 log::info!("File counter looped.");
36 }
37 self.file_map.swap(
38 self.file_counter,
39 (self.file_counter + rng.random_range(0..(NUM_FILES - DIRENT_CACHE_LIMIT))) % NUM_FILES,
40 );
41 let ret = self.file_map[self.file_counter];
42 self.file_counter = (self.file_counter + 1) % NUM_FILES;
43 ret
44 }
45}
46
47/// Designed to 'exercise' the filesystem and thrash the dirent cache.
48/// This cyclically works its way through 10000 files (the dirent cache holds 8000).
49///
50/// On each iteration it will either read, write or rewrite a file.
51/// Unaligned random reads and append-only writes up to 128kB each in length.
52/// Write errors causes the file to be truncated to zero bytes.
53///
54/// The file order is partially permuted each time (only those files which
55/// are known not to be in the dirent cache) to reduce cyclic write/delete patterns.
56///
57/// The truncation of files when we hit the target limit is also randomized to avoid
58/// introducing cyclic patterns (n writes, truncate, n writes, truncate, ...).
59///
60/// Despite these measures, we still expect to see some cyclic behaviour.
61/// Each pass has equal probability of read or write. It takes about 3 writes to reach
62/// 150kB (the average size before we hit target bytes of 1.5GB) so we can assume
63/// n reads, n writes, n/3 truncates in steady state.
64///
65/// n + n + n/3 = 10000
66/// n = 30000 / 7 = 4285
67///
68/// On average, 4285/3 = ~1500 files are truncated each pass. Assuming an average file
69/// size, this is around 220MB or 14%. Best case it will take 7 passes to rewrite all
70/// data, but we can assume that most of the data will be rewritten in a few dozen passes.
71///
72/// Each pass anecdotally takes between 2 and 20 seconds depending on system load,
73/// allocation strategy and fragmentation.
74pub struct Stressor {
75 /// Path prefix to write files to.
76 path_prefix: String,
77 /// Inner state tracking which files can be accessed next.
78 inner: Mutex<Inner>,
79 /// Tracks the number of bytes we have allocated.
80 bytes_stored: AtomicU64,
81 /// The target number of bytes we want the filesystem to consume in steady-state.
82 target_bytes: u64,
83 op_stats: Mutex<[u64; NUM_OPS]>,
84 /// A handle to the directory used to query filesystem info periodically.
85 dir: Option<fio::DirectorySynchronousProxy>,
86}
87
88const READ: usize = 0;
89const WRITE: usize = 1;
90const TRUNCATE: usize = 2;
91const NUM_OPS: usize = 3;
92
93/// The number of files to cycle through.
94/// This must be larger than DIRENT_CACHE_LIMIT to induce cache thrashing.
95const NUM_FILES: usize = 4000;
96
97/// See src/storage/fxfs/platform/src/fuchsia/volume.rs.
98const DIRENT_CACHE_LIMIT: usize = 2000;
99
100impl Stressor {
101 /// Creates a new aggressive stressor that will try to fill the disk until
102 /// `target_free_bytes` of free space remains.
103 pub fn new(path_prefix: &str, target_free_bytes: u64) -> Arc<Self> {
104 let (used_bytes, target_bytes, dir) = if !path_prefix.is_empty() {
105 let dir = fio::DirectorySynchronousProxy::new(
106 fuchsia_async::Channel::from_channel(
107 fdio::transfer_fd(std::fs::File::open("/data").unwrap()).unwrap().into(),
108 )
109 .into(),
110 );
111 let info = dir.query_filesystem(MonotonicInstant::INFINITE.into()).unwrap().1.unwrap();
112
113 log::info!(
114 "Aggressive stressor mode targetting {} free bytes on a {} byte volume.",
115 target_free_bytes,
116 info.total_bytes
117 );
118 (info.used_bytes, info.total_bytes - target_free_bytes, Some(dir))
119 } else {
120 log::info!("Aggressive stressor running without path. Targetting 1MB of space.");
121 (0, 1 << 20, None)
122 };
123
124 Arc::new(Stressor {
125 path_prefix: path_prefix.to_owned(),
126 inner: Mutex::new(Inner {
127 file_counter: 0,
128 file_map: std::array::from_fn(|i| i as u64),
129 }),
130 bytes_stored: AtomicU64::new(used_bytes),
131 target_bytes,
132 op_stats: Default::default(),
133 dir,
134 })
135 }
136
137 /// Starts the stressor. Loops forever.
138 pub fn run(self: &Arc<Self>, num_threads: usize) {
139 for _ in 0..num_threads {
140 let this = self.clone();
141 std::thread::spawn(move || this.worker());
142 }
143 // Sleep forever, periodically updating bytes stored.
144 // This update is racy but close enough for our purposes.
145 loop {
146 std::thread::sleep(std::time::Duration::from_secs(10));
147 if let Some(dir) = &self.dir {
148 let info =
149 dir.query_filesystem(MonotonicInstant::INFINITE.into()).unwrap().1.unwrap();
150 self.bytes_stored.store(info.used_bytes, Ordering::SeqCst);
151 }
152 log::info!(
153 "bytes_stored: {}, target_bytes {}, counts: {:?}",
154 self.bytes_stored.load(Ordering::Relaxed),
155 self.target_bytes,
156 self.op_stats.lock()
157 );
158 }
159 }
160
161 /// The body of the worker loop. Pulled out for testing.
162 fn work_unit(&self, rng: &mut rand::rngs::ThreadRng, buf: &mut Vec<u8>) {
163 let bytes_stored = self.bytes_stored.load(Ordering::Relaxed);
164 // Note that truncate is rare until we exceed 100% target utilization, at which point it
165 // becomes as likely as read or write. By the time we hit 105% target utilization,
166 // truncates are 2.6x as likely as read or write.
167 let weights: [f64; NUM_OPS] = [
168 1.0, /* READ */
169 1.0, /* WRITE */
170 f64::powf(bytes_stored as f64 / self.target_bytes as f64, 20.0), /* TRUNCATE */
171 ];
172 let op = WeightedIndex::new(weights).unwrap().sample(rng);
173 let file_num = self.inner.lock().next_file_num();
174 let path = format!("{}/{file_num}", self.path_prefix);
175 match op {
176 READ => match File::options().read(true).write(false).open(&path) {
177 Ok(f) => {
178 let file_len = f.metadata().unwrap().len();
179 let read_len = rng.random_range(0..128 * 1024u64);
180 let end = file_len.saturating_sub(read_len);
181 let offset = rng.random_range(0..end + 1);
182 buf.resize(read_len as usize, 0);
183 f.read_at(buf, offset).unwrap();
184 }
185 Err(e) => match e.kind() {
186 ErrorKind::NotFound => {}
187 error => {
188 log::warn!("Got an open error on READ: {error:?}");
189 }
190 },
191 },
192 WRITE => {
193 match File::options().create(true).read(true).write(true).open(&path) {
194 Ok(f) => {
195 let file_len = f.metadata().unwrap().len();
196 buf.resize(rng.random_range(0..128 * 1024), 1);
197 match f.write_at(&buf, file_len) {
198 Ok(bytes) => {
199 self.bytes_stored.fetch_add(bytes as u64, Ordering::Relaxed);
200 }
201 Err(_) => {
202 // When a write fails due to space, we truncate.
203 // In this way we fill up the disk then start fragmenting it.
204 // Until #![feature(io_error_more)] is stable, we have
205 // a catch-all here.
206 let _ = f.set_len(0).unwrap();
207 // Metadata (layer files and such) can take up megabytes of
208 // space that is not tracked in `self.bytes_stored`.
209 // If we hit this code path then we've blown through
210 // available space but not hit our 'bytes_stored' limit.
211 //
212 // Writes are relatively small compared to the potential
213 // layer file size so even though it's a bit racy, we will
214 // accept the potential inaccuracy of concurrent writes
215 // and adjust bytes_stored here based on the filesystem's
216 // internal tally.
217 if let Some(dir) = &self.dir {
218 let info = dir
219 .query_filesystem(MonotonicInstant::INFINITE.into())
220 .unwrap()
221 .1
222 .unwrap();
223 log::info!("Correcting bytes_stored. {info:?}");
224 self.bytes_stored.store(info.used_bytes, Ordering::SeqCst);
225 }
226 }
227 };
228 }
229 // Unfortunately ErrorKind::StorageFull is unstable so
230 // we rely on a catch-all here and assume write errors are
231 // due to disk being full. This happens often so we don't log it.
232 Err(_) => {}
233 }
234 }
235 TRUNCATE => match File::options().write(true).open(&path) {
236 Ok(f) => {
237 let file_len = f.metadata().unwrap().len();
238 self.bytes_stored.fetch_sub(file_len, Ordering::Relaxed);
239 let _ = f.set_len(0).unwrap();
240 }
241 Err(_) => {}
242 },
243 _ => unreachable!(),
244 }
245 self.op_stats.lock()[op] += 1;
246 }
247
248 /// Worker thread function.
249 fn worker(&self) {
250 let mut rng = rand::rng();
251 let mut buf = Vec::new();
252 loop {
253 self.work_unit(&mut rng, &mut buf);
254 }
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 #[test]
262 fn stressor() {
263 let stressor = Stressor::new("", 10);
264 let mut rng = rand::rng();
265 let mut buf = Vec::new();
266 stressor.work_unit(&mut rng, &mut buf);
267 assert_eq!(stressor.op_stats.lock().iter().sum::<u64>(), 1);
268 stressor.work_unit(&mut rng, &mut buf);
269 assert_eq!(stressor.op_stats.lock().iter().sum::<u64>(), 2);
270 }
271}