package_directory/
root_dir_cache.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
// Copyright 2024 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use fuchsia_inspect as finspect;
use futures::future::BoxFuture;
use futures::FutureExt as _;
use std::collections::HashMap;
use std::sync::{Arc, Weak};

/// `RootDirCache` is a cache of `Arc<RootDir>`s indexed by their hash.
///
/// The cache internally stores `Weak<RootDir>`s and installs a custom `dropper` in its managed
/// `RootDir`s that removes the corresponding entry when dropped, so it is a cache of
/// `Arc<RootDir>`s that are actively in use by its clients. This is useful for deduplicating
/// the `Arc<RootDir>`s used by VFS to serve package directory connections while also keeping
/// track of which connections are open.
///
/// Because of how `RootDir`` is implemented, a package will have alive `Arc`s if there are:
///   1. fuchsia.io.Directory connections to the package's root directory or any sub directory.
///   2. fuchsia.io.File connections to the package's files under meta/.
///   3. fuchsia.io.File connections to the package's content blobs (files not under meta/) *iff*
///      the `crate::NonMetaStorage` impl serves the connections itself (instead of
///      forwarding to a remote server).
///
/// The `NonMetaStorage` impl we use for Fxblob does serve the File connections itself.
/// The impl we use for Blobfs does not, but Blobfs will wait to delete blobs that have
/// open connections until the last connection closes.
/// Similarly, both Blobfs and Fxblob will wait to delete blobs until the last VMO is closed (VMOs
/// obtained from fuchsia.io.File.GetBackingMemory will not keep a package alive), so it is safe to
/// delete packages that RootDirCache says are not open.
///
/// Clients close connections to packages by closing their end of the Zircon channel over which the
/// fuchsia.io.[File|Directory] messages were being sent. Some time after the client end of the
/// channel is closed, the server (usually in a different process) will be notified by the kernel,
/// and the task serving the connection will finish, dropping its `Arc<RootDir>`.
/// When the last `Arc` is dropped, the strong count of the corresponding `std::sync::Weak` in
/// the `RootDirCache` will decrement to zero. At this point the `RootDirCache`
/// will no longer report the package as open.
/// All this is to say that there will be some delay between a package no longer being in use and
/// clients of `RootDirCache` finding out about that.
#[derive(Debug, Clone)]
pub struct RootDirCache<S> {
    non_meta_storage: S,
    dirs: Arc<std::sync::Mutex<HashMap<fuchsia_hash::Hash, Weak<crate::RootDir<S>>>>>,
}

impl<S: crate::NonMetaStorage + Clone> RootDirCache<S> {
    /// Creates a `RootDirCache` that uses `non_meta_storage` as the backing for the
    /// internally managed `crate::RootDir`s.
    pub fn new(non_meta_storage: S) -> Self {
        let dirs = Arc::new(std::sync::Mutex::new(HashMap::new()));
        Self { non_meta_storage, dirs }
    }

    /// Returns an `Arc<RootDir>` corresponding to `hash`.
    /// If there is not already one in the cache, `root_dir` will be used if provided, otherwise
    /// a new one will be created using the `non_meta_storage` provided to `Self::new`.
    ///
    /// If provided, `root_dir` must be backed by the same `NonMetaStorage` that `Self::new` was
    /// called with. The provided `root_dir` must not have a dropper set.
    pub async fn get_or_insert(
        &self,
        hash: fuchsia_hash::Hash,
        root_dir: Option<crate::RootDir<S>>,
    ) -> Result<Arc<crate::RootDir<S>>, crate::Error> {
        Ok(if let Some(root_dir) = self.get(&hash) {
            root_dir
        } else {
            // If this is dropped while the dirs lock is held it will deadlock.
            let dropper = Box::new(Dropper { dirs: Arc::downgrade(&self.dirs), hash });
            let new_root_dir = match root_dir {
                Some(mut root_dir) => match root_dir.set_dropper(dropper) {
                    Ok(()) => Arc::new(root_dir),
                    // Okay to drop the dropper, the lock is not held and the drop impl handles
                    // missing entries.
                    Err(_) => {
                        return Err(crate::Error::DropperAlreadySet);
                    }
                },
                None => {
                    // Do this without the lock held because:
                    // 1. Making a RootDir takes ~100 μs (reading the meta.far)
                    // 2. If new_with_dropper errors it will drop the dropper, which would deadlock
                    // 3. If any other async task runs on this thread and drops a RootDir (e.g. b/c
                    //    a client closed a connection) it will deadlock.
                    crate::RootDir::new_with_dropper(self.non_meta_storage.clone(), hash, dropper)
                        .await?
                }
            };
            use std::collections::hash_map::Entry::*;
            // let statement needed to drop the lock guard before `new_root_dir` to avoid deadlock.
            let root_dir = match self.dirs.lock().expect("poisoned mutex").entry(hash) {
                // Raced with another call to serve.
                Occupied(mut o) => {
                    let old_root_dir = o.get_mut();
                    if let Some(old_root_dir) = old_root_dir.upgrade() {
                        old_root_dir
                    } else {
                        *old_root_dir = Arc::downgrade(&new_root_dir);
                        new_root_dir
                    }
                }
                Vacant(v) => {
                    v.insert(Arc::downgrade(&new_root_dir));
                    new_root_dir
                }
            };
            root_dir
        })
    }

    /// Returns the `Arc<RootDir>` with the given `hash`, if one exists in the cache.
    /// Otherwise returns `None`.
    /// Holding on to the returned `Arc` will keep the package open (as reported by
    /// `Self::list`).
    pub fn get(&self, hash: &fuchsia_hash::Hash) -> Option<Arc<crate::RootDir<S>>> {
        self.dirs.lock().expect("poisoned mutex").get(hash)?.upgrade()
    }

    /// Packages with live `Arc<RootDir>`s.
    /// Holding on to the returned `Arc`s will keep the packages open.
    pub fn list(&self) -> Vec<Arc<crate::RootDir<S>>> {
        self.dirs.lock().expect("poisoned mutex").iter().filter_map(|(_, v)| v.upgrade()).collect()
    }

    /// Returns a callback to be given to `fuchsia_inspect::Node::record_lazy_child`.
    /// Records the package hashes and their corresponding `Arc<RootDir>` strong counts.
    pub fn record_lazy_inspect(
        &self,
    ) -> impl Fn() -> BoxFuture<'static, Result<finspect::Inspector, anyhow::Error>>
           + Send
           + Sync
           + 'static {
        let dirs = Arc::downgrade(&self.dirs);
        move || {
            let dirs = dirs.clone();
            async move {
                let inspector = finspect::Inspector::default();
                if let Some(dirs) = dirs.upgrade() {
                    let package_counts: HashMap<_, _> = {
                        let dirs = dirs.lock().expect("poisoned mutex");
                        dirs.iter().map(|(k, v)| (*k, v.strong_count() as u64)).collect()
                    };
                    let root = inspector.root();
                    let () = package_counts.into_iter().for_each(|(pkg, count)| {
                        root.record_child(pkg.to_string(), |n| n.record_uint("instances", count))
                    });
                }
                Ok(inspector)
            }
            .boxed()
        }
    }
}

/// Removes the corresponding entry from RootDirCache's self.dirs when dropped.
struct Dropper<S> {
    dirs: Weak<std::sync::Mutex<HashMap<fuchsia_hash::Hash, Weak<crate::RootDir<S>>>>>,
    hash: fuchsia_hash::Hash,
}

impl<S> Drop for Dropper<S> {
    fn drop(&mut self) {
        let Some(dirs) = self.dirs.upgrade() else {
            return;
        };
        use std::collections::hash_map::Entry::*;
        match dirs.lock().expect("poisoned mutex").entry(self.hash) {
            Occupied(o) => {
                // In case this raced with a call to serve that added a new one.
                if o.get().strong_count() == 0 {
                    o.remove_entry();
                }
            }
            // Never added because creation failed.
            Vacant(_) => (),
        };
    }
}

impl<S> std::fmt::Debug for Dropper<S> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Dropper").field("dirs", &self.dirs).field("hash", &self.hash).finish()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use assert_matches::assert_matches;
    use diagnostics_assertions::assert_data_tree;
    use fidl_fuchsia_io as fio;
    use fuchsia_pkg_testing::blobfs::Fake as FakeBlobfs;
    use fuchsia_pkg_testing::PackageBuilder;
    use vfs::directory::entry_container::Directory as _;

    #[fuchsia::test]
    async fn get_or_insert_new_entry() {
        let pkg = PackageBuilder::new("pkg-name").build().await.unwrap();
        let (metafar_blob, _) = pkg.contents();
        let (blobfs_fake, blobfs_client) = FakeBlobfs::new();
        blobfs_fake.add_blob(metafar_blob.merkle, metafar_blob.contents);
        let server = RootDirCache::new(blobfs_client);

        let dir = server.get_or_insert(metafar_blob.merkle, None).await.unwrap();

        assert_eq!(server.list().len(), 1);

        drop(dir);
        assert_eq!(server.list().len(), 0);
        assert!(server.dirs.lock().expect("poisoned mutex").is_empty());
    }

    #[fuchsia::test]
    async fn closing_package_connection_closes_package() {
        let pkg = PackageBuilder::new("pkg-name").build().await.unwrap();
        let (metafar_blob, _) = pkg.contents();
        let (blobfs_fake, blobfs_client) = FakeBlobfs::new();
        blobfs_fake.add_blob(metafar_blob.merkle, metafar_blob.contents);
        let server = RootDirCache::new(blobfs_client);

        let dir = server.get_or_insert(metafar_blob.merkle, None).await.unwrap();
        let (proxy, server_end) = fidl::endpoints::create_proxy::<fio::DirectoryMarker>();
        let scope = vfs::execution_scope::ExecutionScope::new();
        let () = dir.open(
            scope.clone(),
            fio::OpenFlags::RIGHT_READABLE,
            vfs::path::Path::dot(),
            server_end.into_channel().into(),
        );
        let _: fio::ConnectionInfo =
            proxy.get_connection_info().await.expect("directory succesfully handling requests");
        assert_eq!(server.list().len(), 1);

        drop(proxy);
        let () = scope.wait().await;
        assert_eq!(server.list().len(), 0);
        assert!(server.dirs.lock().expect("poisoned mutex").is_empty());
    }

    #[fuchsia::test]
    async fn get_or_insert_existing_entry() {
        let pkg = PackageBuilder::new("pkg-name").build().await.unwrap();
        let (metafar_blob, _) = pkg.contents();
        let (blobfs_fake, blobfs_client) = FakeBlobfs::new();
        blobfs_fake.add_blob(metafar_blob.merkle, metafar_blob.contents);
        let server = RootDirCache::new(blobfs_client);

        let dir0 = server.get_or_insert(metafar_blob.merkle, None).await.unwrap();

        let dir1 = server.get_or_insert(metafar_blob.merkle, None).await.unwrap();
        assert_eq!(server.list().len(), 1);
        assert_eq!(Arc::strong_count(&server.list()[0]), 3);

        drop(dir0);
        drop(dir1);
        assert_eq!(server.list().len(), 0);
        assert!(server.dirs.lock().expect("poisoned mutex").is_empty());
    }

    #[fuchsia::test]
    async fn get_or_insert_provided_root_dir() {
        let pkg = PackageBuilder::new("pkg-name").build().await.unwrap();
        let (metafar_blob, _) = pkg.contents();
        let (blobfs_fake, blobfs_client) = FakeBlobfs::new();
        blobfs_fake.add_blob(metafar_blob.merkle, metafar_blob.contents);
        let root_dir = crate::RootDir::new_raw(blobfs_client.clone(), metafar_blob.merkle, None)
            .await
            .unwrap();
        blobfs_fake.delete_blob(metafar_blob.merkle);
        let server = RootDirCache::new(blobfs_client);

        let dir = server.get_or_insert(metafar_blob.merkle, Some(root_dir)).await.unwrap();
        assert_eq!(server.list().len(), 1);

        drop(dir);
        assert_eq!(server.list().len(), 0);
        assert!(server.dirs.lock().expect("poisoned mutex").is_empty());
    }

    #[fuchsia::test]
    async fn get_or_insert_provided_root_dir_error_if_already_has_dropper() {
        let pkg = PackageBuilder::new("pkg-name").build().await.unwrap();
        let (metafar_blob, _) = pkg.contents();
        let (blobfs_fake, blobfs_client) = FakeBlobfs::new();
        blobfs_fake.add_blob(metafar_blob.merkle, metafar_blob.contents);
        let root_dir =
            crate::RootDir::new_raw(blobfs_client.clone(), metafar_blob.merkle, Some(Box::new(())))
                .await
                .unwrap();
        let server = RootDirCache::new(blobfs_client);

        assert_matches!(
            server.get_or_insert(metafar_blob.merkle, Some(root_dir)).await,
            Err(crate::Error::DropperAlreadySet)
        );
        assert!(server.dirs.lock().expect("poisoned mutex").is_empty());
    }

    #[fuchsia::test]
    async fn get_or_insert_fails_if_root_dir_creation_fails() {
        let (_blobfs_fake, blobfs_client) = FakeBlobfs::new();
        let server = RootDirCache::new(blobfs_client);

        assert_matches!(
            server.get_or_insert([0; 32].into(), None).await,
            Err(crate::Error::MissingMetaFar)
        );
        assert!(server.dirs.lock().expect("poisoned mutex").is_empty());
    }

    #[fuchsia::test]
    async fn get_or_insert_concurrent_race_to_insert_new_root_dir() {
        let pkg = PackageBuilder::new("pkg-name").build().await.unwrap();
        let (metafar_blob, _) = pkg.contents();
        let (blobfs_fake, blobfs_client) = FakeBlobfs::new();
        blobfs_fake.add_blob(metafar_blob.merkle, metafar_blob.contents);
        let server = RootDirCache::new(blobfs_client);

        let fut0 = server.get_or_insert(metafar_blob.merkle, None);

        let fut1 = server.get_or_insert(metafar_blob.merkle, None);

        let (res0, res1) = futures::future::join(fut0, fut1).await;
        let (dir0, dir1) = (res0.unwrap(), res1.unwrap());

        assert_eq!(server.list().len(), 1);
        assert_eq!(Arc::strong_count(&server.list()[0]), 3);

        drop(dir0);
        drop(dir1);
        assert_eq!(server.list().len(), 0);
        assert!(server.dirs.lock().expect("poisoned mutex").is_empty());
    }

    #[fuchsia::test]
    async fn inspect() {
        let pkg = PackageBuilder::new("pkg-name").build().await.unwrap();
        let (metafar_blob, _) = pkg.contents();
        let (blobfs_fake, blobfs_client) = FakeBlobfs::new();
        blobfs_fake.add_blob(metafar_blob.merkle, metafar_blob.contents);
        let server = RootDirCache::new(blobfs_client);
        let _dir = server.get_or_insert(metafar_blob.merkle, None).await.unwrap();

        let inspector = finspect::Inspector::default();
        inspector.root().record_lazy_child("open-packages", server.record_lazy_inspect());

        assert_data_tree!(inspector, root: {
            "open-packages": {
                pkg.hash().to_string() => {
                    "instances": 1u64,
                },
            }
        });
    }
}