Skip to main content

fxfs_platform/fuchsia/
node.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::directory::FxDirectory;
6use crate::fuchsia::file::FxFile;
7use fuchsia_sync::Mutex;
8use futures::future::poll_fn;
9use fxfs::object_store::ObjectDescriptor;
10use fxfs_macros::ToWeakNode;
11use std::any::TypeId;
12use std::collections::BTreeMap;
13use std::collections::btree_map::Entry;
14use std::fmt;
15use std::mem::ManuallyDrop;
16use std::sync::{Arc, Weak};
17use std::task::{Poll, Waker};
18use vfs::common::IntoAny;
19
20/// FxNode is a node in the filesystem hierarchy (either a file or directory).
21pub trait FxNode: IntoAny + ToWeakNode + Send + Sync + 'static {
22    fn object_id(&self) -> u64;
23    fn parent(&self) -> Option<Arc<FxDirectory>>;
24    fn set_parent(&self, parent: Arc<FxDirectory>);
25    fn open_count_add_one(&self);
26
27    /// Atomically check if this brought the count to zero while the node is
28    /// marked for purge. If so, this *must* queue the node for tombstone the node.
29    fn open_count_sub_one(self: Arc<Self>);
30    fn object_descriptor(&self) -> ObjectDescriptor;
31
32    /// Marks the object to be purged. Queues the node for tombstone if open count is zero.
33    fn mark_to_be_purged(&self) {
34        panic!("Unexpected call to mark_to_be_purged");
35    }
36
37    /// Called when the filesystem is shutting down. Implementations should break any strong
38    /// reference cycles that would prevent the node from being dropped.
39    fn terminate(&self) {}
40}
41
42struct PlaceholderInner {
43    object_id: u64,
44    waker_sequence: u64,
45    wakers: Vec<Waker>,
46}
47
48#[derive(ToWeakNode)]
49struct Placeholder(Mutex<PlaceholderInner>);
50
51impl FxNode for Placeholder {
52    fn object_id(&self) -> u64 {
53        self.0.lock().object_id
54    }
55    fn parent(&self) -> Option<Arc<FxDirectory>> {
56        unreachable!();
57    }
58    fn set_parent(&self, _parent: Arc<FxDirectory>) {
59        unreachable!();
60    }
61    fn open_count_add_one(&self) {}
62    fn open_count_sub_one(self: Arc<Self>) {}
63
64    fn object_descriptor(&self) -> ObjectDescriptor {
65        ObjectDescriptor::File
66    }
67}
68
69impl fmt::Debug for dyn FxNode {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        f.debug_struct("FxNode")
72            .field("id", &self.object_id())
73            .field("descriptor", &self.object_descriptor())
74            .finish()
75    }
76}
77
78/// PlaceholderOwner is a reserved slot in the node cache.
79pub struct PlaceholderOwner<'a> {
80    inner: Arc<Placeholder>,
81    committed: bool,
82    cache: &'a NodeCache,
83}
84
85impl PlaceholderOwner<'_> {
86    /// Commits a node to the cache, replacing the placeholder and unblocking any waiting callers.
87    pub fn commit(mut self, node: &Arc<dyn FxNode>) {
88        let this_object_id = self.inner.object_id();
89        assert_eq!(node.object_id(), this_object_id);
90        self.committed = true;
91        self.cache.commit(node);
92    }
93}
94
95impl Drop for PlaceholderOwner<'_> {
96    fn drop(&mut self) {
97        let wakers = {
98            let mut cache_guard = (!self.committed).then(|| self.cache.0.lock());
99            let mut p = self.inner.0.lock();
100            if let Some(cache) = &mut cache_guard {
101                // If the placeholder is dropped before it was committed, remove the cache entry
102                // so that another caller blocked in NodeCache::get_or_reserve can take the slot.
103                cache.map.remove(&p.object_id);
104            }
105            std::mem::take(&mut p.wakers)
106        };
107        for waker in wakers {
108            waker.wake();
109        }
110    }
111}
112
113/// See NodeCache::get_or_reserve.
114pub enum GetResult<'a> {
115    Placeholder(PlaceholderOwner<'a>),
116    Node(Arc<dyn FxNode>),
117}
118
119impl<'a> GetResult<'a> {
120    pub fn placeholder(self) -> Option<PlaceholderOwner<'a>> {
121        match self {
122            GetResult::Placeholder(placeholder) => Some(placeholder),
123            _ => None,
124        }
125    }
126}
127
128/// WeakNode permits `upgrade_and_downcast_node` below which only tries to upgrade if we know the
129/// downcast will work.  This makes for a simpler and more performant implementation of `FileIter`
130/// which would otherwise have to jump through some hoops to drop the reference count whilst not
131/// holding locks if the downcast fails.
132pub struct WeakNode {
133    vtable: &'static WeakNodeVTable,
134    node: *const (),
135}
136
137impl WeakNode {
138    // This is unsafe because the caller must make sure `node` comes from `Weak<T>::into_raw` and
139    // provide correct implementations for the functions in `vtable`.
140    pub(crate) unsafe fn new(vtable: &'static WeakNodeVTable, node: *const ()) -> Self {
141        Self { vtable, node }
142    }
143
144    unsafe fn is_type<T: 'static>(&self) -> bool {
145        unsafe { (self.vtable.type_id)() == TypeId::of::<T>() }
146    }
147}
148
149impl Drop for WeakNode {
150    fn drop(&mut self) {
151        // SAFETY: See the implementation of `ToWeakNode` in `macros.rs`.  `node` should be a
152        // pointer that came from `Weak<T>::into_raw`.  The `drop` function needs to use
153        // `Weak::from_raw`.
154        unsafe {
155            (self.vtable.drop)(self.node);
156        }
157    }
158}
159
160// SAFETY: `node` comes from `Weak<T>::into_raw` which is safe to send across threads.
161unsafe impl Send for WeakNode {}
162
163pub(crate) struct WeakNodeVTable {
164    // This should convert the pointer back to `Weak<T>` to drop it.
165    drop: unsafe fn(*const ()),
166
167    // Returns the `TypeId::of::<T>` for `Weak<T>`.
168    type_id: unsafe fn() -> TypeId,
169
170    // Tries to upgrade the `Weak<T>` and returns `Arc<dyn FxNode>`.
171    upgrade: unsafe fn(*const ()) -> Option<Arc<dyn FxNode>>,
172}
173
174impl WeakNodeVTable {
175    pub(crate) const fn new(
176        drop: unsafe fn(*const ()),
177        type_id: unsafe fn() -> TypeId,
178        upgrade: unsafe fn(*const ()) -> Option<Arc<dyn FxNode>>,
179    ) -> Self {
180        Self { drop, type_id, upgrade }
181    }
182}
183
184/// Used to convert nodes into `WeakNode` which is stored in the cache.  This should be implemented
185/// using the `ToWeakNode` derive macro.
186pub trait ToWeakNode {
187    fn to_weak_node(self: Arc<Self>) -> WeakNode;
188}
189
190/// Upgrades and downcasts as a single step.  This won't do the upgrade if the downcast will fail,
191/// which avoids issues with dropping the reference whilst locks are held.
192fn upgrade_and_downcast_node<T: 'static>(weak_node: &WeakNode) -> Option<Arc<T>> {
193    // SAFETY: We check `T` matches before converting the pointer (which should be the result of
194    // `Weak<T>::into_raw`), back to `Weak<T>`.
195    unsafe {
196        if weak_node.is_type::<T>() {
197            ManuallyDrop::new(Weak::from_raw(weak_node.node as *const T)).upgrade()
198        } else {
199            None
200        }
201    }
202}
203
204/// Upgrades to `Arc<dyn FxNode>`.
205fn upgrade_node(weak_node: &WeakNode) -> Option<Arc<dyn FxNode>> {
206    // SAFETY: Safe if `WeakNode::new` is called correctly and the `upgrade` function is implemented
207    // correctly.  See the implementation in `macros.rs`.
208    unsafe { (weak_node.vtable.upgrade)(weak_node.node) }
209}
210
211struct NodeCacheInner {
212    map: BTreeMap<u64, WeakNode>,
213    next_waker_sequence: u64,
214}
215
216/// NodeCache is an in-memory cache of weak node references.
217pub struct NodeCache(Mutex<NodeCacheInner>);
218
219/// Iterates over all files in the cache (skipping directories).
220pub struct FileIter<'a> {
221    cache: &'a NodeCache,
222    object_id: Option<u64>,
223}
224
225impl<'a> Iterator for FileIter<'a> {
226    type Item = Arc<FxFile>;
227    fn next(&mut self) -> Option<Self::Item> {
228        let cache = self.cache.0.lock();
229        let range = match self.object_id {
230            None => cache.map.range(0..),
231            Some(oid) => cache.map.range(oid + 1..),
232        };
233        for (object_id, node) in range {
234            if let Some(file) = upgrade_and_downcast_node(node) {
235                self.object_id = Some(*object_id);
236                return Some(file);
237            }
238        }
239        None
240    }
241}
242
243impl NodeCache {
244    pub fn new() -> Self {
245        Self(Mutex::new(NodeCacheInner { map: BTreeMap::new(), next_waker_sequence: 0 }))
246    }
247
248    /// Gets a node in the cache, or reserves a placeholder in the cache to fill.
249    ///
250    /// Only the first caller will receive a placeholder result; all callers after that will block
251    /// until the placeholder is filled (or the placeholder is dropped, at which point the next
252    /// caller would get a placeholder). Callers that receive a placeholder should later commit a
253    /// node with NodeCache::commit.
254    pub async fn get_or_reserve<'a>(&'a self, object_id: u64) -> GetResult<'a> {
255        let mut waker_sequence = 0;
256        let mut waker_index = 0;
257        poll_fn(|cx| {
258            let mut this = self.0.lock();
259            if let Some(node) = this.map.get(&object_id) {
260                if let Some(node) = upgrade_node(node) {
261                    if let Ok(placeholder) = node.clone().into_any().downcast::<Placeholder>() {
262                        let mut inner = placeholder.0.lock();
263                        if inner.waker_sequence == waker_sequence {
264                            inner.wakers[waker_index] = cx.waker().clone();
265                        } else {
266                            waker_index = inner.wakers.len();
267                            waker_sequence = inner.waker_sequence;
268                            inner.wakers.push(cx.waker().clone());
269                        }
270                        return Poll::Pending;
271                    } else {
272                        return Poll::Ready(GetResult::Node(node));
273                    }
274                }
275            }
276            this.next_waker_sequence += 1;
277            let inner = Arc::new(Placeholder(Mutex::new(PlaceholderInner {
278                object_id,
279                waker_sequence: this.next_waker_sequence,
280                wakers: vec![],
281            })));
282            this.map.insert(object_id, inner.clone().to_weak_node());
283            Poll::Ready(GetResult::Placeholder(PlaceholderOwner {
284                inner,
285                committed: false,
286                cache: self,
287            }))
288        })
289        .await
290    }
291
292    /// Removes a node from the cache. Calling this on a placeholder is an error; instead, the
293    /// placeholder should simply be dropped.
294    pub fn remove(&self, node: &dyn FxNode) {
295        let mut this = self.0.lock();
296        if let Entry::Occupied(o) = this.map.entry(node.object_id()) {
297            // If this method is called when a node is being dropped, then upgrade will fail and
298            // it's possible the cache has been populated with another node, so to avoid that race,
299            // we must check that the node in the cache is the node we want to remove.
300            //
301            // Note this ugly cast in place of `std::ptr::eq(o.get().as_ptr(), node)` here is
302            // to ensure we don't compare vtable pointers, which are not strictly guaranteed to be
303            // the same across casts done in different code generation units at compilation time.
304            if o.get().node == node as *const dyn FxNode as *const () {
305                o.remove();
306            }
307        }
308    }
309
310    /// Returns the given node if present in the cache. This call should be handled with care. If it
311    /// is being used to check if there is no live node version, then there should be precautions
312    /// taken to ensure one will not be created in a race with this call or any of the resulting
313    /// actions.
314    pub fn get(&self, object_id: u64) -> Option<Arc<dyn FxNode>> {
315        let l = self.0.lock();
316        let weak_node = l.map.get(&object_id)?;
317        // Don't return placeholders as valid nodes. Panicking here, as this is a result of a race.
318        // If it happens we're likely breaking an assumption, but should not be permanent for the
319        // filesystem. Better to remount.
320        assert!(
321            unsafe { !weak_node.is_type::<Placeholder>() },
322            "Returning a placeholder indicates a race with open"
323        );
324        upgrade_node(weak_node)
325    }
326
327    /// Returns an iterator over all files in the cache.
328    pub fn files(&self) -> FileIter<'_> {
329        FileIter { cache: self, object_id: None }
330    }
331
332    pub fn terminate(&self) {
333        let _drop_list = {
334            let this = self.0.lock();
335            let mut drop_list = Vec::with_capacity(this.map.len());
336            for (_, node) in &this.map {
337                if let Some(node) = upgrade_node(&node) {
338                    node.terminate();
339                    // We must drop later when we're not holding the lock.
340                    drop_list.push(node);
341                }
342            }
343            drop_list
344        };
345    }
346
347    fn commit(&self, node: &Arc<dyn FxNode>) {
348        let mut this = self.0.lock();
349        this.map.insert(node.object_id(), node.clone().to_weak_node());
350    }
351}
352
353// Wraps a node with an open count.
354pub struct OpenedNode<N: FxNode + ?Sized>(pub Arc<N>);
355
356impl<N: FxNode + ?Sized> OpenedNode<N> {
357    pub fn new(node: Arc<N>) -> Self {
358        node.open_count_add_one();
359        OpenedNode(node)
360    }
361
362    /// Downcasts to something that implements FxNode.
363    pub fn downcast<T: FxNode>(self) -> Result<OpenedNode<T>, Self> {
364        if self.is::<T>() {
365            Ok(OpenedNode(
366                self.take().into_any().downcast::<T>().unwrap_or_else(|_| unreachable!()),
367            ))
368        } else {
369            Err(self)
370        }
371    }
372
373    /// Takes the wrapped node.  The caller takes responsibility for dropping the open count.
374    pub fn take(self) -> Arc<N> {
375        let this = std::mem::ManuallyDrop::new(self);
376        unsafe { std::ptr::read(&this.0) }
377    }
378
379    /// Returns true if this is an instance of T.
380    pub fn is<T: 'static>(&self) -> bool {
381        self.0.as_ref().type_id() == TypeId::of::<T>()
382    }
383}
384
385impl<N: FxNode + ?Sized> Drop for OpenedNode<N> {
386    fn drop(&mut self) {
387        self.0.clone().open_count_sub_one();
388    }
389}
390
391impl<N: FxNode + ?Sized> std::ops::Deref for OpenedNode<N> {
392    type Target = Arc<N>;
393
394    fn deref(&self) -> &Self::Target {
395        &self.0
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use crate::fuchsia::directory::FxDirectory;
402    use crate::fuchsia::node::{FxNode, GetResult, NodeCache};
403    use fuchsia_async as fasync;
404    use fuchsia_sync::Mutex;
405    use futures::future::join_all;
406    use fxfs::object_store::ObjectDescriptor;
407    use fxfs_macros::ToWeakNode;
408    use std::sync::Arc;
409    use std::sync::atomic::{AtomicU64, Ordering};
410    use std::time::Duration;
411
412    #[derive(ToWeakNode)]
413    struct FakeNode(u64, Arc<NodeCache>);
414    impl FxNode for FakeNode {
415        fn object_id(&self) -> u64 {
416            self.0
417        }
418        fn parent(&self) -> Option<Arc<FxDirectory>> {
419            unreachable!();
420        }
421        fn set_parent(&self, _parent: Arc<FxDirectory>) {
422            unreachable!();
423        }
424        fn open_count_add_one(&self) {}
425        fn open_count_sub_one(self: Arc<Self>) {}
426
427        fn object_descriptor(&self) -> ObjectDescriptor {
428            ObjectDescriptor::Directory
429        }
430    }
431    impl Drop for FakeNode {
432        fn drop(&mut self) {
433            self.1.remove(self);
434        }
435    }
436
437    #[fuchsia::test]
438    async fn test_drop_placeholder() {
439        let cache = Arc::new(NodeCache::new());
440        let object_id = 0u64;
441        match cache.get_or_reserve(object_id).await {
442            GetResult::Node(_) => panic!("Unexpected node"),
443            GetResult::Placeholder(_) => {}
444        };
445        match cache.get_or_reserve(object_id).await {
446            GetResult::Node(_) => panic!("Unexpected node"),
447            GetResult::Placeholder(_) => {}
448        };
449    }
450
451    #[fuchsia::test]
452    async fn test_simple() {
453        let cache = Arc::new(NodeCache::new());
454        let object_id = {
455            let node = Arc::new(FakeNode(0, cache.clone()));
456            match cache.get_or_reserve(node.object_id()).await {
457                GetResult::Node(_) => panic!("Unexpected node"),
458                GetResult::Placeholder(p) => {
459                    p.commit(&(node.clone() as Arc<dyn FxNode>));
460                }
461            };
462            match cache.get_or_reserve(node.object_id()).await {
463                GetResult::Node(n) => assert_eq!(n.object_id(), node.object_id()),
464                GetResult::Placeholder(_) => panic!("No node found"),
465            };
466            node.object_id()
467        };
468        match cache.get_or_reserve(object_id).await {
469            GetResult::Node(_) => panic!("Unexpected node"),
470            GetResult::Placeholder(_) => {}
471        };
472    }
473
474    #[fuchsia::test(threads = 10)]
475    async fn test_subsequent_callers_block() {
476        let cache = Arc::new(NodeCache::new());
477        let object_id = 0u64;
478        let writes_to_cache = Arc::new(AtomicU64::new(0));
479        let reads_from_cache = Arc::new(AtomicU64::new(0));
480        let node = Arc::new(FakeNode(object_id, cache.clone()));
481        join_all((0..10).map(|_| {
482            let node = node.clone();
483            let cache = cache.clone();
484            let object_id = object_id.clone();
485            let writes_to_cache = writes_to_cache.clone();
486            let reads_from_cache = reads_from_cache.clone();
487            async move {
488                match cache.get_or_reserve(object_id).await {
489                    GetResult::Node(node) => {
490                        reads_from_cache.fetch_add(1, Ordering::SeqCst);
491                        assert_eq!(node.object_id(), object_id);
492                    }
493                    GetResult::Placeholder(p) => {
494                        writes_to_cache.fetch_add(1, Ordering::SeqCst);
495                        // Add a delay to simulate doing some work (e.g. loading from disk).
496                        fasync::Timer::new(Duration::from_millis(100)).await;
497                        p.commit(&(node as Arc<dyn FxNode>));
498                    }
499                }
500            }
501        }))
502        .await;
503        assert_eq!(writes_to_cache.load(Ordering::SeqCst), 1);
504        assert_eq!(reads_from_cache.load(Ordering::SeqCst), 9);
505    }
506
507    #[fuchsia::test(threads = 10)]
508    async fn test_multiple_nodes() {
509        const NUM_OBJECTS: usize = 5;
510        const TASKS_PER_OBJECT: usize = 4;
511
512        let cache = Arc::new(NodeCache::new());
513        let writes = Arc::new(Mutex::new(vec![0u64; NUM_OBJECTS]));
514        let reads = Arc::new(Mutex::new(vec![0u64; NUM_OBJECTS]));
515        let nodes: Vec<_> = (0..NUM_OBJECTS as u64)
516            .map(|object_id| Arc::new(FakeNode(object_id, cache.clone())))
517            .collect();
518
519        join_all((0..TASKS_PER_OBJECT).flat_map(|_| {
520            nodes.iter().cloned().map(|node| {
521                let cache = cache.clone();
522                let writes = writes.clone();
523                let reads = reads.clone();
524                async move {
525                    match cache.get_or_reserve(node.object_id()).await {
526                        GetResult::Node(result) => {
527                            assert_eq!(node.object_id(), result.object_id());
528                            reads.lock()[node.object_id() as usize] += 1;
529                        }
530                        GetResult::Placeholder(p) => {
531                            writes.lock()[node.object_id() as usize] += 1;
532                            // Add a delay to simulate doing some work (e.g. loading from disk).
533                            fasync::Timer::new(Duration::from_millis(100)).await;
534                            p.commit(&(node as Arc<dyn FxNode>));
535                        }
536                    }
537                }
538            })
539        }))
540        .await;
541        assert_eq!(*writes.lock(), vec![1u64; NUM_OBJECTS]);
542        assert_eq!(*reads.lock(), vec![TASKS_PER_OBJECT as u64 - 1; NUM_OBJECTS]);
543    }
544}