Skip to main content

stressor_lib/
aggressive.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Starts the 'aggressive' stressor.
6//! This variant performs open-read-close, open-append-close, and unlink-open-append-close
7//! operations. Note that 'truncate' is not exercised.
8//! This version also silently ignores errors such as out of space.
9
10use fidl_fuchsia_io as fio;
11use fuchsia_async::MonotonicInstant;
12use fuchsia_sync::Mutex;
13use rand::distr::weighted::WeightedIndex;
14use rand::prelude::*;
15use std::fs::File;
16use std::io::ErrorKind;
17use std::os::unix::fs::FileExt;
18use std::sync::Arc;
19use std::sync::atomic::{AtomicU64, Ordering};
20
21struct Inner {
22    /// Used to round-robin across NUM_FILES in order.
23    file_counter: usize,
24    /// Maps from file_counter to file index. This allows us to shuffle the tail
25    /// as we round-robin without touching the files that are in the dirent cache.
26    file_map: [u64; NUM_FILES],
27}
28
29impl Inner {
30    /// Returns the next file to use, avoiding the most recent DIRENT_CACHE_LIMIT
31    /// files returned but shuffling the remainder.
32    pub fn next_file_num(&mut self) -> u64 {
33        let mut rng = rand::rng();
34        if self.file_counter == 0 {
35            log::info!("File counter looped.");
36        }
37        self.file_map.swap(
38            self.file_counter,
39            (self.file_counter + rng.random_range(0..(NUM_FILES - DIRENT_CACHE_LIMIT))) % NUM_FILES,
40        );
41        let ret = self.file_map[self.file_counter];
42        self.file_counter = (self.file_counter + 1) % NUM_FILES;
43        ret
44    }
45}
46
47/// Designed to 'exercise' the filesystem and thrash the dirent cache.
48/// This cyclically works its way through 10000 files (the dirent cache holds 8000).
49///
50/// On each iteration it will either read, write or rewrite a file.
51/// Unaligned random reads and append-only writes up to 128kB each in length.
52/// Write errors causes the file to be truncated to zero bytes.
53///
54/// The file order is partially permuted each time (only those files which
55/// are known not to be in the dirent cache) to reduce cyclic write/delete patterns.
56///
57/// The truncation of files when we hit the target limit is also randomized to avoid
58/// introducing cyclic patterns (n writes, truncate, n writes, truncate, ...).
59///
60/// Despite these measures, we still expect to see some cyclic behaviour.
61/// Each pass has equal probability of read or write. It takes about 3 writes to reach
62/// 150kB (the average size before we hit target bytes of 1.5GB) so we can assume
63/// n reads, n writes, n/3 truncates in steady state.
64///
65///   n + n + n/3 = 10000
66///   n = 30000 / 7 = 4285
67///
68/// On average, 4285/3 = ~1500 files are truncated each pass. Assuming an average file
69/// size, this is around 220MB or 14%. Best case it will take 7 passes to rewrite all
70/// data, but we can assume that most of the data will be rewritten in a few dozen passes.
71///
72/// Each pass anecdotally takes between 2 and 20 seconds depending on system load,
73/// allocation strategy and fragmentation.
74pub struct Stressor {
75    /// Path prefix to write files to.
76    path_prefix: String,
77    /// Inner state tracking which files can be accessed next.
78    inner: Mutex<Inner>,
79    /// Tracks the number of bytes we have allocated.
80    bytes_stored: AtomicU64,
81    /// The target number of bytes we want the filesystem to consume in steady-state.
82    target_bytes: u64,
83    op_stats: Mutex<[u64; NUM_OPS]>,
84    /// A handle to the directory used to query filesystem info periodically.
85    dir: Option<fio::DirectorySynchronousProxy>,
86}
87
88const READ: usize = 0;
89const WRITE: usize = 1;
90const TRUNCATE: usize = 2;
91const NUM_OPS: usize = 3;
92
93/// The number of files to cycle through.
94/// This must be larger than DIRENT_CACHE_LIMIT to induce cache thrashing.
95const NUM_FILES: usize = 4000;
96
97/// See src/storage/fxfs/platform/src/fuchsia/volume.rs.
98const DIRENT_CACHE_LIMIT: usize = 2000;
99
100impl Stressor {
101    /// Creates a new aggressive stressor that will try to fill the disk until
102    /// `target_free_bytes` of free space remains.
103    pub fn new(path_prefix: &str, target_free_bytes: u64) -> Arc<Self> {
104        let (used_bytes, target_bytes, dir) = if !path_prefix.is_empty() {
105            let dir = fio::DirectorySynchronousProxy::new(
106                fuchsia_async::Channel::from_channel(
107                    fdio::transfer_fd(std::fs::File::open("/data").unwrap()).unwrap().into(),
108                )
109                .into(),
110            );
111            let info = dir.query_filesystem(MonotonicInstant::INFINITE.into()).unwrap().1.unwrap();
112
113            log::info!(
114                "Aggressive stressor mode targetting {} free bytes on a {} byte volume.",
115                target_free_bytes,
116                info.total_bytes
117            );
118            (info.used_bytes, info.total_bytes - target_free_bytes, Some(dir))
119        } else {
120            log::info!("Aggressive stressor running without path. Targetting 1MB of space.");
121            (0, 1 << 20, None)
122        };
123
124        Arc::new(Stressor {
125            path_prefix: path_prefix.to_owned(),
126            inner: Mutex::new(Inner {
127                file_counter: 0,
128                file_map: std::array::from_fn(|i| i as u64),
129            }),
130            bytes_stored: AtomicU64::new(used_bytes),
131            target_bytes,
132            op_stats: Default::default(),
133            dir,
134        })
135    }
136
137    /// Starts the stressor. Loops forever.
138    pub fn run(self: &Arc<Self>, num_threads: usize) {
139        for _ in 0..num_threads {
140            let this = self.clone();
141            std::thread::spawn(move || this.worker());
142        }
143        // Sleep forever, periodically updating bytes stored.
144        // This update is racy but close enough for our purposes.
145        loop {
146            std::thread::sleep(std::time::Duration::from_secs(10));
147            if let Some(dir) = &self.dir {
148                let info =
149                    dir.query_filesystem(MonotonicInstant::INFINITE.into()).unwrap().1.unwrap();
150                self.bytes_stored.store(info.used_bytes, Ordering::SeqCst);
151            }
152            log::info!(
153                "bytes_stored: {}, target_bytes {}, counts: {:?}",
154                self.bytes_stored.load(Ordering::Relaxed),
155                self.target_bytes,
156                self.op_stats.lock()
157            );
158        }
159    }
160
161    /// The body of the worker loop. Pulled out for testing.
162    fn work_unit(&self, rng: &mut rand::rngs::ThreadRng, buf: &mut Vec<u8>) {
163        let bytes_stored = self.bytes_stored.load(Ordering::Relaxed);
164        // Note that truncate is rare until we exceed 100% target utilization, at which point it
165        // becomes as likely as read or write. By the time we hit 105% target utilization,
166        // truncates are 2.6x as likely as read or write.
167        let weights: [f64; NUM_OPS] = [
168            1.0,                                                             /* READ */
169            1.0,                                                             /* WRITE */
170            f64::powf(bytes_stored as f64 / self.target_bytes as f64, 20.0), /* TRUNCATE */
171        ];
172        let op = WeightedIndex::new(weights).unwrap().sample(rng);
173        let file_num = self.inner.lock().next_file_num();
174        let path = format!("{}/{file_num}", self.path_prefix);
175        match op {
176            READ => match File::options().read(true).write(false).open(&path) {
177                Ok(f) => {
178                    let file_len = f.metadata().unwrap().len();
179                    let read_len = rng.random_range(0..128 * 1024u64);
180                    let end = file_len.saturating_sub(read_len);
181                    let offset = rng.random_range(0..end + 1);
182                    buf.resize(read_len as usize, 0);
183                    f.read_at(buf, offset).unwrap();
184                }
185                Err(e) => match e.kind() {
186                    ErrorKind::NotFound => {}
187                    error => {
188                        log::warn!("Got an open error on READ: {error:?}");
189                    }
190                },
191            },
192            WRITE => {
193                match File::options().create(true).read(true).write(true).open(&path) {
194                    Ok(f) => {
195                        let file_len = f.metadata().unwrap().len();
196                        buf.resize(rng.random_range(0..128 * 1024), 1);
197                        match f.write_at(&buf, file_len) {
198                            Ok(bytes) => {
199                                self.bytes_stored.fetch_add(bytes as u64, Ordering::Relaxed);
200                            }
201                            Err(_) => {
202                                // When a write fails due to space, we truncate.
203                                // In this way we fill up the disk then start fragmenting it.
204                                // Until #![feature(io_error_more)] is stable, we have
205                                // a catch-all here.
206                                let _ = f.set_len(0).unwrap();
207                                // Metadata (layer files and such) can take up megabytes of
208                                // space that is not tracked in `self.bytes_stored`.
209                                // If we hit this code path then we've blown through
210                                // available space but not hit our 'bytes_stored' limit.
211                                //
212                                // Writes are relatively small compared to the potential
213                                // layer file size so even though it's a bit racy, we will
214                                // accept the potential inaccuracy of concurrent writes
215                                // and adjust bytes_stored here based on the filesystem's
216                                // internal tally.
217                                if let Some(dir) = &self.dir {
218                                    let info = dir
219                                        .query_filesystem(MonotonicInstant::INFINITE.into())
220                                        .unwrap()
221                                        .1
222                                        .unwrap();
223                                    log::info!("Correcting bytes_stored. {info:?}");
224                                    self.bytes_stored.store(info.used_bytes, Ordering::SeqCst);
225                                }
226                            }
227                        };
228                    }
229                    // Unfortunately ErrorKind::StorageFull is unstable so
230                    // we rely on a catch-all here and assume write errors are
231                    // due to disk being full. This happens often so we don't log it.
232                    Err(_) => {}
233                }
234            }
235            TRUNCATE => match File::options().write(true).open(&path) {
236                Ok(f) => {
237                    let file_len = f.metadata().unwrap().len();
238                    self.bytes_stored.fetch_sub(file_len, Ordering::Relaxed);
239                    let _ = f.set_len(0).unwrap();
240                }
241                Err(_) => {}
242            },
243            _ => unreachable!(),
244        }
245        self.op_stats.lock()[op] += 1;
246    }
247
248    /// Worker thread function.
249    fn worker(&self) {
250        let mut rng = rand::rng();
251        let mut buf = Vec::new();
252        loop {
253            self.work_unit(&mut rng, &mut buf);
254        }
255    }
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    #[test]
262    fn stressor() {
263        let stressor = Stressor::new("", 10);
264        let mut rng = rand::rng();
265        let mut buf = Vec::new();
266        stressor.work_unit(&mut rng, &mut buf);
267        assert_eq!(stressor.op_stats.lock().iter().sum::<u64>(), 1);
268        stressor.work_unit(&mut rng, &mut buf);
269        assert_eq!(stressor.op_stats.lock().iter().sum::<u64>(), 2);
270    }
271}