Skip to main content

fxfs_platform/fuchsia/
node.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::fuchsia::directory::FxDirectory;
6use crate::fuchsia::file::FxFile;
7use fuchsia_sync::Mutex;
8use futures::future::poll_fn;
9use fxfs::object_store::ObjectDescriptor;
10use fxfs_macros::ToWeakNode;
11use std::any::TypeId;
12use std::collections::BTreeMap;
13use std::collections::btree_map::Entry;
14use std::fmt;
15use std::mem::ManuallyDrop;
16use std::sync::{Arc, Weak};
17use std::task::{Poll, Waker};
18use vfs::common::IntoAny;
19
20/// FxNode is a node in the filesystem hierarchy (either a file or directory).
21pub trait FxNode: IntoAny + ToWeakNode + Send + Sync + 'static {
22    fn object_id(&self) -> u64;
23    fn parent(&self) -> Option<Arc<FxDirectory>>;
24    fn set_parent(&self, parent: Arc<FxDirectory>);
25    fn open_count_add_one(&self);
26
27    /// Atomically check if this brought the count to zero while the node is
28    /// marked for purge. If so, this *must* queue the node for tombstone the node.
29    fn open_count_sub_one(self: Arc<Self>);
30    fn object_descriptor(&self) -> ObjectDescriptor;
31
32    /// Marks the object to be purged. Queues the node for tombstone if open count is zero.
33    fn mark_to_be_purged(&self) {
34        panic!("Unexpected call to mark_to_be_purged");
35    }
36
37    /// Called when the filesystem is shutting down. Implementations should break any strong
38    /// reference cycles that would prevent the node from being dropped.
39    fn terminate(&self) {}
40}
41
42struct PlaceholderInner {
43    object_id: u64,
44    waker_sequence: u64,
45    wakers: Vec<Waker>,
46}
47
48#[derive(ToWeakNode)]
49struct Placeholder(Mutex<PlaceholderInner>);
50
51impl FxNode for Placeholder {
52    fn object_id(&self) -> u64 {
53        self.0.lock().object_id
54    }
55    fn parent(&self) -> Option<Arc<FxDirectory>> {
56        unreachable!();
57    }
58    fn set_parent(&self, _parent: Arc<FxDirectory>) {
59        unreachable!();
60    }
61    fn open_count_add_one(&self) {}
62    fn open_count_sub_one(self: Arc<Self>) {}
63
64    fn object_descriptor(&self) -> ObjectDescriptor {
65        ObjectDescriptor::File
66    }
67}
68
69impl fmt::Debug for dyn FxNode {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        f.debug_struct("FxNode")
72            .field("id", &self.object_id())
73            .field("descriptor", &self.object_descriptor())
74            .finish()
75    }
76}
77
78/// PlaceholderOwner is a reserved slot in the node cache.
79pub struct PlaceholderOwner<'a> {
80    inner: Arc<Placeholder>,
81    committed: bool,
82    cache: &'a NodeCache,
83}
84
85impl PlaceholderOwner<'_> {
86    /// Commits a node to the cache, replacing the placeholder and unblocking any waiting callers.
87    pub fn commit(mut self, node: &Arc<dyn FxNode>) {
88        let this_object_id = self.inner.object_id();
89        assert_eq!(node.object_id(), this_object_id);
90        self.committed = true;
91        self.cache.commit(node);
92    }
93}
94
95impl Drop for PlaceholderOwner<'_> {
96    fn drop(&mut self) {
97        let mut p = self.inner.0.lock();
98        if !self.committed {
99            // If the placeholder is dropped before it was committed, remove the cache entry so that
100            // another caller blocked in NodeCache::get_or_reserve can take the slot.
101            self.cache.0.lock().map.remove(&p.object_id);
102        }
103        for waker in p.wakers.drain(..) {
104            waker.wake();
105        }
106    }
107}
108
109/// See NodeCache::get_or_reserve.
110pub enum GetResult<'a> {
111    Placeholder(PlaceholderOwner<'a>),
112    Node(Arc<dyn FxNode>),
113}
114
115impl<'a> GetResult<'a> {
116    pub fn placeholder(self) -> Option<PlaceholderOwner<'a>> {
117        match self {
118            GetResult::Placeholder(placeholder) => Some(placeholder),
119            _ => None,
120        }
121    }
122}
123
124/// WeakNode permits `upgrade_and_downcast_node` below which only tries to upgrade if we know the
125/// downcast will work.  This makes for a simpler and more performant implementation of `FileIter`
126/// which would otherwise have to jump through some hoops to drop the reference count whilst not
127/// holding locks if the downcast fails.
128pub struct WeakNode {
129    vtable: &'static WeakNodeVTable,
130    node: *const (),
131}
132
133impl WeakNode {
134    // This is unsafe because the caller must make sure `node` comes from `Weak<T>::into_raw` and
135    // provide correct implementations for the functions in `vtable`.
136    pub(crate) unsafe fn new(vtable: &'static WeakNodeVTable, node: *const ()) -> Self {
137        Self { vtable, node }
138    }
139
140    unsafe fn is_type<T: 'static>(&self) -> bool {
141        unsafe { (self.vtable.type_id)() == TypeId::of::<T>() }
142    }
143}
144
145impl Drop for WeakNode {
146    fn drop(&mut self) {
147        // SAFETY: See the implementation of `ToWeakNode` in `macros.rs`.  `node` should be a
148        // pointer that came from `Weak<T>::into_raw`.  The `drop` function needs to use
149        // `Weak::from_raw`.
150        unsafe {
151            (self.vtable.drop)(self.node);
152        }
153    }
154}
155
156// SAFETY: `node` comes from `Weak<T>::into_raw` which is safe to send across threads.
157unsafe impl Send for WeakNode {}
158
159pub(crate) struct WeakNodeVTable {
160    // This should convert the pointer back to `Weak<T>` to drop it.
161    drop: unsafe fn(*const ()),
162
163    // Returns the `TypeId::of::<T>` for `Weak<T>`.
164    type_id: unsafe fn() -> TypeId,
165
166    // Tries to upgrade the `Weak<T>` and returns `Arc<dyn FxNode>`.
167    upgrade: unsafe fn(*const ()) -> Option<Arc<dyn FxNode>>,
168}
169
170impl WeakNodeVTable {
171    pub(crate) const fn new(
172        drop: unsafe fn(*const ()),
173        type_id: unsafe fn() -> TypeId,
174        upgrade: unsafe fn(*const ()) -> Option<Arc<dyn FxNode>>,
175    ) -> Self {
176        Self { drop, type_id, upgrade }
177    }
178}
179
180/// Used to convert nodes into `WeakNode` which is stored in the cache.  This should be implemented
181/// using the `ToWeakNode` derive macro.
182pub trait ToWeakNode {
183    fn to_weak_node(self: Arc<Self>) -> WeakNode;
184}
185
186/// Upgrades and downcasts as a single step.  This won't do the upgrade if the downcast will fail,
187/// which avoids issues with dropping the reference whilst locks are held.
188fn upgrade_and_downcast_node<T: 'static>(weak_node: &WeakNode) -> Option<Arc<T>> {
189    // SAFETY: We check `T` matches before converting the pointer (which should be the result of
190    // `Weak<T>::into_raw`), back to `Weak<T>`.
191    unsafe {
192        if weak_node.is_type::<T>() {
193            ManuallyDrop::new(Weak::from_raw(weak_node.node as *const T)).upgrade()
194        } else {
195            None
196        }
197    }
198}
199
200/// Upgrades to `Arc<dyn FxNode>`.
201fn upgrade_node(weak_node: &WeakNode) -> Option<Arc<dyn FxNode>> {
202    // SAFETY: Safe if `WeakNode::new` is called correctly and the `upgrade` function is implemented
203    // correctly.  See the implementation in `macros.rs`.
204    unsafe { (weak_node.vtable.upgrade)(weak_node.node) }
205}
206
207struct NodeCacheInner {
208    map: BTreeMap<u64, WeakNode>,
209    next_waker_sequence: u64,
210}
211
212/// NodeCache is an in-memory cache of weak node references.
213pub struct NodeCache(Mutex<NodeCacheInner>);
214
215/// Iterates over all files in the cache (skipping directories).
216pub struct FileIter<'a> {
217    cache: &'a NodeCache,
218    object_id: Option<u64>,
219}
220
221impl<'a> Iterator for FileIter<'a> {
222    type Item = Arc<FxFile>;
223    fn next(&mut self) -> Option<Self::Item> {
224        let cache = self.cache.0.lock();
225        let range = match self.object_id {
226            None => cache.map.range(0..),
227            Some(oid) => cache.map.range(oid + 1..),
228        };
229        for (object_id, node) in range {
230            if let Some(file) = upgrade_and_downcast_node(node) {
231                self.object_id = Some(*object_id);
232                return Some(file);
233            }
234        }
235        None
236    }
237}
238
239impl NodeCache {
240    pub fn new() -> Self {
241        Self(Mutex::new(NodeCacheInner { map: BTreeMap::new(), next_waker_sequence: 0 }))
242    }
243
244    /// Gets a node in the cache, or reserves a placeholder in the cache to fill.
245    ///
246    /// Only the first caller will receive a placeholder result; all callers after that will block
247    /// until the placeholder is filled (or the placeholder is dropped, at which point the next
248    /// caller would get a placeholder). Callers that receive a placeholder should later commit a
249    /// node with NodeCache::commit.
250    pub async fn get_or_reserve<'a>(&'a self, object_id: u64) -> GetResult<'a> {
251        let mut waker_sequence = 0;
252        let mut waker_index = 0;
253        poll_fn(|cx| {
254            let mut this = self.0.lock();
255            if let Some(node) = this.map.get(&object_id) {
256                if let Some(node) = upgrade_node(node) {
257                    if let Ok(placeholder) = node.clone().into_any().downcast::<Placeholder>() {
258                        let mut inner = placeholder.0.lock();
259                        if inner.waker_sequence == waker_sequence {
260                            inner.wakers[waker_index] = cx.waker().clone();
261                        } else {
262                            waker_index = inner.wakers.len();
263                            waker_sequence = inner.waker_sequence;
264                            inner.wakers.push(cx.waker().clone());
265                        }
266                        return Poll::Pending;
267                    } else {
268                        return Poll::Ready(GetResult::Node(node));
269                    }
270                }
271            }
272            this.next_waker_sequence += 1;
273            let inner = Arc::new(Placeholder(Mutex::new(PlaceholderInner {
274                object_id,
275                waker_sequence: this.next_waker_sequence,
276                wakers: vec![],
277            })));
278            this.map.insert(object_id, inner.clone().to_weak_node());
279            Poll::Ready(GetResult::Placeholder(PlaceholderOwner {
280                inner,
281                committed: false,
282                cache: self,
283            }))
284        })
285        .await
286    }
287
288    /// Removes a node from the cache. Calling this on a placeholder is an error; instead, the
289    /// placeholder should simply be dropped.
290    pub fn remove(&self, node: &dyn FxNode) {
291        let mut this = self.0.lock();
292        if let Entry::Occupied(o) = this.map.entry(node.object_id()) {
293            // If this method is called when a node is being dropped, then upgrade will fail and
294            // it's possible the cache has been populated with another node, so to avoid that race,
295            // we must check that the node in the cache is the node we want to remove.
296            //
297            // Note this ugly cast in place of `std::ptr::eq(o.get().as_ptr(), node)` here is
298            // to ensure we don't compare vtable pointers, which are not strictly guaranteed to be
299            // the same across casts done in different code generation units at compilation time.
300            if o.get().node == node as *const dyn FxNode as *const () {
301                o.remove();
302            }
303        }
304    }
305
306    /// Returns the given node if present in the cache. This call should be handled with care. If it
307    /// is being used to check if there is no live node version, then there should be precautions
308    /// taken to ensure one will not be created in a race with this call or any of the resulting
309    /// actions.
310    pub fn get(&self, object_id: u64) -> Option<Arc<dyn FxNode>> {
311        let l = self.0.lock();
312        let weak_node = l.map.get(&object_id)?;
313        // Don't return placeholders as valid nodes. Panicking here, as this is a result of a race.
314        // If it happens we're likely breaking an assumption, but should not be permanent for the
315        // filesystem. Better to remount.
316        assert!(
317            unsafe { !weak_node.is_type::<Placeholder>() },
318            "Returning a placeholder indicates a race with open"
319        );
320        upgrade_node(weak_node)
321    }
322
323    /// Returns an iterator over all files in the cache.
324    pub fn files(&self) -> FileIter<'_> {
325        FileIter { cache: self, object_id: None }
326    }
327
328    pub fn terminate(&self) {
329        let _drop_list = {
330            let this = self.0.lock();
331            let mut drop_list = Vec::with_capacity(this.map.len());
332            for (_, node) in &this.map {
333                if let Some(node) = upgrade_node(&node) {
334                    node.terminate();
335                    // We must drop later when we're not holding the lock.
336                    drop_list.push(node);
337                }
338            }
339            drop_list
340        };
341    }
342
343    fn commit(&self, node: &Arc<dyn FxNode>) {
344        let mut this = self.0.lock();
345        this.map.insert(node.object_id(), node.clone().to_weak_node());
346    }
347}
348
349// Wraps a node with an open count.
350pub struct OpenedNode<N: FxNode + ?Sized>(pub Arc<N>);
351
352impl<N: FxNode + ?Sized> OpenedNode<N> {
353    pub fn new(node: Arc<N>) -> Self {
354        node.open_count_add_one();
355        OpenedNode(node)
356    }
357
358    /// Downcasts to something that implements FxNode.
359    pub fn downcast<T: FxNode>(self) -> Result<OpenedNode<T>, Self> {
360        if self.is::<T>() {
361            Ok(OpenedNode(
362                self.take().into_any().downcast::<T>().unwrap_or_else(|_| unreachable!()),
363            ))
364        } else {
365            Err(self)
366        }
367    }
368
369    /// Takes the wrapped node.  The caller takes responsibility for dropping the open count.
370    pub fn take(self) -> Arc<N> {
371        let this = std::mem::ManuallyDrop::new(self);
372        unsafe { std::ptr::read(&this.0) }
373    }
374
375    /// Returns true if this is an instance of T.
376    pub fn is<T: 'static>(&self) -> bool {
377        self.0.as_ref().type_id() == TypeId::of::<T>()
378    }
379}
380
381impl<N: FxNode + ?Sized> Drop for OpenedNode<N> {
382    fn drop(&mut self) {
383        self.0.clone().open_count_sub_one();
384    }
385}
386
387impl<N: FxNode + ?Sized> std::ops::Deref for OpenedNode<N> {
388    type Target = Arc<N>;
389
390    fn deref(&self) -> &Self::Target {
391        &self.0
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use crate::fuchsia::directory::FxDirectory;
398    use crate::fuchsia::node::{FxNode, GetResult, NodeCache};
399    use fuchsia_async as fasync;
400    use fuchsia_sync::Mutex;
401    use futures::future::join_all;
402    use fxfs::object_store::ObjectDescriptor;
403    use fxfs_macros::ToWeakNode;
404    use std::sync::Arc;
405    use std::sync::atomic::{AtomicU64, Ordering};
406    use std::time::Duration;
407
408    #[derive(ToWeakNode)]
409    struct FakeNode(u64, Arc<NodeCache>);
410    impl FxNode for FakeNode {
411        fn object_id(&self) -> u64 {
412            self.0
413        }
414        fn parent(&self) -> Option<Arc<FxDirectory>> {
415            unreachable!();
416        }
417        fn set_parent(&self, _parent: Arc<FxDirectory>) {
418            unreachable!();
419        }
420        fn open_count_add_one(&self) {}
421        fn open_count_sub_one(self: Arc<Self>) {}
422
423        fn object_descriptor(&self) -> ObjectDescriptor {
424            ObjectDescriptor::Directory
425        }
426    }
427    impl Drop for FakeNode {
428        fn drop(&mut self) {
429            self.1.remove(self);
430        }
431    }
432
433    #[fuchsia::test]
434    async fn test_drop_placeholder() {
435        let cache = Arc::new(NodeCache::new());
436        let object_id = 0u64;
437        match cache.get_or_reserve(object_id).await {
438            GetResult::Node(_) => panic!("Unexpected node"),
439            GetResult::Placeholder(_) => {}
440        };
441        match cache.get_or_reserve(object_id).await {
442            GetResult::Node(_) => panic!("Unexpected node"),
443            GetResult::Placeholder(_) => {}
444        };
445    }
446
447    #[fuchsia::test]
448    async fn test_simple() {
449        let cache = Arc::new(NodeCache::new());
450        let object_id = {
451            let node = Arc::new(FakeNode(0, cache.clone()));
452            match cache.get_or_reserve(node.object_id()).await {
453                GetResult::Node(_) => panic!("Unexpected node"),
454                GetResult::Placeholder(p) => {
455                    p.commit(&(node.clone() as Arc<dyn FxNode>));
456                }
457            };
458            match cache.get_or_reserve(node.object_id()).await {
459                GetResult::Node(n) => assert_eq!(n.object_id(), node.object_id()),
460                GetResult::Placeholder(_) => panic!("No node found"),
461            };
462            node.object_id()
463        };
464        match cache.get_or_reserve(object_id).await {
465            GetResult::Node(_) => panic!("Unexpected node"),
466            GetResult::Placeholder(_) => {}
467        };
468    }
469
470    #[fuchsia::test(threads = 10)]
471    async fn test_subsequent_callers_block() {
472        let cache = Arc::new(NodeCache::new());
473        let object_id = 0u64;
474        let writes_to_cache = Arc::new(AtomicU64::new(0));
475        let reads_from_cache = Arc::new(AtomicU64::new(0));
476        let node = Arc::new(FakeNode(object_id, cache.clone()));
477        join_all((0..10).map(|_| {
478            let node = node.clone();
479            let cache = cache.clone();
480            let object_id = object_id.clone();
481            let writes_to_cache = writes_to_cache.clone();
482            let reads_from_cache = reads_from_cache.clone();
483            async move {
484                match cache.get_or_reserve(object_id).await {
485                    GetResult::Node(node) => {
486                        reads_from_cache.fetch_add(1, Ordering::SeqCst);
487                        assert_eq!(node.object_id(), object_id);
488                    }
489                    GetResult::Placeholder(p) => {
490                        writes_to_cache.fetch_add(1, Ordering::SeqCst);
491                        // Add a delay to simulate doing some work (e.g. loading from disk).
492                        fasync::Timer::new(Duration::from_millis(100)).await;
493                        p.commit(&(node as Arc<dyn FxNode>));
494                    }
495                }
496            }
497        }))
498        .await;
499        assert_eq!(writes_to_cache.load(Ordering::SeqCst), 1);
500        assert_eq!(reads_from_cache.load(Ordering::SeqCst), 9);
501    }
502
503    #[fuchsia::test(threads = 10)]
504    async fn test_multiple_nodes() {
505        const NUM_OBJECTS: usize = 5;
506        const TASKS_PER_OBJECT: usize = 4;
507
508        let cache = Arc::new(NodeCache::new());
509        let writes = Arc::new(Mutex::new(vec![0u64; NUM_OBJECTS]));
510        let reads = Arc::new(Mutex::new(vec![0u64; NUM_OBJECTS]));
511        let nodes: Vec<_> = (0..NUM_OBJECTS as u64)
512            .map(|object_id| Arc::new(FakeNode(object_id, cache.clone())))
513            .collect();
514
515        join_all((0..TASKS_PER_OBJECT).flat_map(|_| {
516            nodes.iter().cloned().map(|node| {
517                let cache = cache.clone();
518                let writes = writes.clone();
519                let reads = reads.clone();
520                async move {
521                    match cache.get_or_reserve(node.object_id()).await {
522                        GetResult::Node(result) => {
523                            assert_eq!(node.object_id(), result.object_id());
524                            reads.lock()[node.object_id() as usize] += 1;
525                        }
526                        GetResult::Placeholder(p) => {
527                            writes.lock()[node.object_id() as usize] += 1;
528                            // Add a delay to simulate doing some work (e.g. loading from disk).
529                            fasync::Timer::new(Duration::from_millis(100)).await;
530                            p.commit(&(node as Arc<dyn FxNode>));
531                        }
532                    }
533                }
534            })
535        }))
536        .await;
537        assert_eq!(*writes.lock(), vec![1u64; NUM_OBJECTS]);
538        assert_eq!(*reads.lock(), vec![TASKS_PER_OBJECT as u64 - 1; NUM_OBJECTS]);
539    }
540}