fxfs_platform/fuchsia/
node.rs1use crate::fuchsia::directory::FxDirectory;
6use crate::fuchsia::file::FxFile;
7use fuchsia_sync::Mutex;
8use futures::future::poll_fn;
9use fxfs::object_store::ObjectDescriptor;
10use fxfs_macros::ToWeakNode;
11use std::any::TypeId;
12use std::collections::BTreeMap;
13use std::collections::btree_map::Entry;
14use std::fmt;
15use std::mem::ManuallyDrop;
16use std::sync::{Arc, Weak};
17use std::task::{Poll, Waker};
18use vfs::common::IntoAny;
19
20pub trait FxNode: IntoAny + ToWeakNode + Send + Sync + 'static {
22 fn object_id(&self) -> u64;
23 fn parent(&self) -> Option<Arc<FxDirectory>>;
24 fn set_parent(&self, parent: Arc<FxDirectory>);
25 fn open_count_add_one(&self);
26
27 fn open_count_sub_one(self: Arc<Self>);
30 fn object_descriptor(&self) -> ObjectDescriptor;
31
32 fn mark_to_be_purged(&self) {
34 panic!("Unexpected call to mark_to_be_purged");
35 }
36
37 fn terminate(&self) {}
40}
41
42struct PlaceholderInner {
43 object_id: u64,
44 waker_sequence: u64,
45 wakers: Vec<Waker>,
46}
47
48#[derive(ToWeakNode)]
49struct Placeholder(Mutex<PlaceholderInner>);
50
51impl FxNode for Placeholder {
52 fn object_id(&self) -> u64 {
53 self.0.lock().object_id
54 }
55 fn parent(&self) -> Option<Arc<FxDirectory>> {
56 unreachable!();
57 }
58 fn set_parent(&self, _parent: Arc<FxDirectory>) {
59 unreachable!();
60 }
61 fn open_count_add_one(&self) {}
62 fn open_count_sub_one(self: Arc<Self>) {}
63
64 fn object_descriptor(&self) -> ObjectDescriptor {
65 ObjectDescriptor::File
66 }
67}
68
69impl fmt::Debug for dyn FxNode {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.debug_struct("FxNode")
72 .field("id", &self.object_id())
73 .field("descriptor", &self.object_descriptor())
74 .finish()
75 }
76}
77
78pub struct PlaceholderOwner<'a> {
80 inner: Arc<Placeholder>,
81 committed: bool,
82 cache: &'a NodeCache,
83}
84
85impl PlaceholderOwner<'_> {
86 pub fn commit(mut self, node: &Arc<dyn FxNode>) {
88 let this_object_id = self.inner.object_id();
89 assert_eq!(node.object_id(), this_object_id);
90 self.committed = true;
91 self.cache.commit(node);
92 }
93}
94
95impl Drop for PlaceholderOwner<'_> {
96 fn drop(&mut self) {
97 let wakers = {
98 let mut cache_guard = (!self.committed).then(|| self.cache.0.lock());
99 let mut p = self.inner.0.lock();
100 if let Some(cache) = &mut cache_guard {
101 cache.map.remove(&p.object_id);
104 }
105 std::mem::take(&mut p.wakers)
106 };
107 for waker in wakers {
108 waker.wake();
109 }
110 }
111}
112
113pub enum GetResult<'a> {
115 Placeholder(PlaceholderOwner<'a>),
116 Node(Arc<dyn FxNode>),
117}
118
119impl<'a> GetResult<'a> {
120 pub fn placeholder(self) -> Option<PlaceholderOwner<'a>> {
121 match self {
122 GetResult::Placeholder(placeholder) => Some(placeholder),
123 _ => None,
124 }
125 }
126}
127
128pub struct WeakNode {
133 vtable: &'static WeakNodeVTable,
134 node: *const (),
135}
136
137impl WeakNode {
138 pub(crate) unsafe fn new(vtable: &'static WeakNodeVTable, node: *const ()) -> Self {
141 Self { vtable, node }
142 }
143
144 unsafe fn is_type<T: 'static>(&self) -> bool {
145 unsafe { (self.vtable.type_id)() == TypeId::of::<T>() }
146 }
147}
148
149impl Drop for WeakNode {
150 fn drop(&mut self) {
151 unsafe {
155 (self.vtable.drop)(self.node);
156 }
157 }
158}
159
160unsafe impl Send for WeakNode {}
162
163pub(crate) struct WeakNodeVTable {
164 drop: unsafe fn(*const ()),
166
167 type_id: unsafe fn() -> TypeId,
169
170 upgrade: unsafe fn(*const ()) -> Option<Arc<dyn FxNode>>,
172}
173
174impl WeakNodeVTable {
175 pub(crate) const fn new(
176 drop: unsafe fn(*const ()),
177 type_id: unsafe fn() -> TypeId,
178 upgrade: unsafe fn(*const ()) -> Option<Arc<dyn FxNode>>,
179 ) -> Self {
180 Self { drop, type_id, upgrade }
181 }
182}
183
184pub trait ToWeakNode {
187 fn to_weak_node(self: Arc<Self>) -> WeakNode;
188}
189
190fn upgrade_and_downcast_node<T: 'static>(weak_node: &WeakNode) -> Option<Arc<T>> {
193 unsafe {
196 if weak_node.is_type::<T>() {
197 ManuallyDrop::new(Weak::from_raw(weak_node.node as *const T)).upgrade()
198 } else {
199 None
200 }
201 }
202}
203
204fn upgrade_node(weak_node: &WeakNode) -> Option<Arc<dyn FxNode>> {
206 unsafe { (weak_node.vtable.upgrade)(weak_node.node) }
209}
210
211struct NodeCacheInner {
212 map: BTreeMap<u64, WeakNode>,
213 next_waker_sequence: u64,
214}
215
216pub struct NodeCache(Mutex<NodeCacheInner>);
218
219pub struct FileIter<'a> {
221 cache: &'a NodeCache,
222 object_id: Option<u64>,
223}
224
225impl<'a> Iterator for FileIter<'a> {
226 type Item = Arc<FxFile>;
227 fn next(&mut self) -> Option<Self::Item> {
228 let cache = self.cache.0.lock();
229 let range = match self.object_id {
230 None => cache.map.range(0..),
231 Some(oid) => cache.map.range(oid + 1..),
232 };
233 for (object_id, node) in range {
234 if let Some(file) = upgrade_and_downcast_node(node) {
235 self.object_id = Some(*object_id);
236 return Some(file);
237 }
238 }
239 None
240 }
241}
242
243impl NodeCache {
244 pub fn new() -> Self {
245 Self(Mutex::new(NodeCacheInner { map: BTreeMap::new(), next_waker_sequence: 0 }))
246 }
247
248 pub async fn get_or_reserve<'a>(&'a self, object_id: u64) -> GetResult<'a> {
255 let mut waker_sequence = 0;
256 let mut waker_index = 0;
257 poll_fn(|cx| {
258 let mut this = self.0.lock();
259 if let Some(node) = this.map.get(&object_id) {
260 if let Some(node) = upgrade_node(node) {
261 if let Ok(placeholder) = node.clone().into_any().downcast::<Placeholder>() {
262 let mut inner = placeholder.0.lock();
263 if inner.waker_sequence == waker_sequence {
264 inner.wakers[waker_index] = cx.waker().clone();
265 } else {
266 waker_index = inner.wakers.len();
267 waker_sequence = inner.waker_sequence;
268 inner.wakers.push(cx.waker().clone());
269 }
270 return Poll::Pending;
271 } else {
272 return Poll::Ready(GetResult::Node(node));
273 }
274 }
275 }
276 this.next_waker_sequence += 1;
277 let inner = Arc::new(Placeholder(Mutex::new(PlaceholderInner {
278 object_id,
279 waker_sequence: this.next_waker_sequence,
280 wakers: vec![],
281 })));
282 this.map.insert(object_id, inner.clone().to_weak_node());
283 Poll::Ready(GetResult::Placeholder(PlaceholderOwner {
284 inner,
285 committed: false,
286 cache: self,
287 }))
288 })
289 .await
290 }
291
292 pub fn remove(&self, node: &dyn FxNode) {
295 let mut this = self.0.lock();
296 if let Entry::Occupied(o) = this.map.entry(node.object_id()) {
297 if o.get().node == node as *const dyn FxNode as *const () {
305 o.remove();
306 }
307 }
308 }
309
310 pub fn get(&self, object_id: u64) -> Option<Arc<dyn FxNode>> {
315 let l = self.0.lock();
316 let weak_node = l.map.get(&object_id)?;
317 assert!(
321 unsafe { !weak_node.is_type::<Placeholder>() },
322 "Returning a placeholder indicates a race with open"
323 );
324 upgrade_node(weak_node)
325 }
326
327 pub fn files(&self) -> FileIter<'_> {
329 FileIter { cache: self, object_id: None }
330 }
331
332 pub fn terminate(&self) {
333 let _drop_list = {
334 let this = self.0.lock();
335 let mut drop_list = Vec::with_capacity(this.map.len());
336 for (_, node) in &this.map {
337 if let Some(node) = upgrade_node(&node) {
338 node.terminate();
339 drop_list.push(node);
341 }
342 }
343 drop_list
344 };
345 }
346
347 fn commit(&self, node: &Arc<dyn FxNode>) {
348 let mut this = self.0.lock();
349 this.map.insert(node.object_id(), node.clone().to_weak_node());
350 }
351}
352
353pub struct OpenedNode<N: FxNode + ?Sized>(pub Arc<N>);
355
356impl<N: FxNode + ?Sized> OpenedNode<N> {
357 pub fn new(node: Arc<N>) -> Self {
358 node.open_count_add_one();
359 OpenedNode(node)
360 }
361
362 pub fn downcast<T: FxNode>(self) -> Result<OpenedNode<T>, Self> {
364 if self.is::<T>() {
365 Ok(OpenedNode(
366 self.take().into_any().downcast::<T>().unwrap_or_else(|_| unreachable!()),
367 ))
368 } else {
369 Err(self)
370 }
371 }
372
373 pub fn take(self) -> Arc<N> {
375 let this = std::mem::ManuallyDrop::new(self);
376 unsafe { std::ptr::read(&this.0) }
377 }
378
379 pub fn is<T: 'static>(&self) -> bool {
381 self.0.as_ref().type_id() == TypeId::of::<T>()
382 }
383}
384
385impl<N: FxNode + ?Sized> Drop for OpenedNode<N> {
386 fn drop(&mut self) {
387 self.0.clone().open_count_sub_one();
388 }
389}
390
391impl<N: FxNode + ?Sized> std::ops::Deref for OpenedNode<N> {
392 type Target = Arc<N>;
393
394 fn deref(&self) -> &Self::Target {
395 &self.0
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use crate::fuchsia::directory::FxDirectory;
402 use crate::fuchsia::node::{FxNode, GetResult, NodeCache};
403 use fuchsia_async as fasync;
404 use fuchsia_sync::Mutex;
405 use futures::future::join_all;
406 use fxfs::object_store::ObjectDescriptor;
407 use fxfs_macros::ToWeakNode;
408 use std::sync::Arc;
409 use std::sync::atomic::{AtomicU64, Ordering};
410 use std::time::Duration;
411
412 #[derive(ToWeakNode)]
413 struct FakeNode(u64, Arc<NodeCache>);
414 impl FxNode for FakeNode {
415 fn object_id(&self) -> u64 {
416 self.0
417 }
418 fn parent(&self) -> Option<Arc<FxDirectory>> {
419 unreachable!();
420 }
421 fn set_parent(&self, _parent: Arc<FxDirectory>) {
422 unreachable!();
423 }
424 fn open_count_add_one(&self) {}
425 fn open_count_sub_one(self: Arc<Self>) {}
426
427 fn object_descriptor(&self) -> ObjectDescriptor {
428 ObjectDescriptor::Directory
429 }
430 }
431 impl Drop for FakeNode {
432 fn drop(&mut self) {
433 self.1.remove(self);
434 }
435 }
436
437 #[fuchsia::test]
438 async fn test_drop_placeholder() {
439 let cache = Arc::new(NodeCache::new());
440 let object_id = 0u64;
441 match cache.get_or_reserve(object_id).await {
442 GetResult::Node(_) => panic!("Unexpected node"),
443 GetResult::Placeholder(_) => {}
444 };
445 match cache.get_or_reserve(object_id).await {
446 GetResult::Node(_) => panic!("Unexpected node"),
447 GetResult::Placeholder(_) => {}
448 };
449 }
450
451 #[fuchsia::test]
452 async fn test_simple() {
453 let cache = Arc::new(NodeCache::new());
454 let object_id = {
455 let node = Arc::new(FakeNode(0, cache.clone()));
456 match cache.get_or_reserve(node.object_id()).await {
457 GetResult::Node(_) => panic!("Unexpected node"),
458 GetResult::Placeholder(p) => {
459 p.commit(&(node.clone() as Arc<dyn FxNode>));
460 }
461 };
462 match cache.get_or_reserve(node.object_id()).await {
463 GetResult::Node(n) => assert_eq!(n.object_id(), node.object_id()),
464 GetResult::Placeholder(_) => panic!("No node found"),
465 };
466 node.object_id()
467 };
468 match cache.get_or_reserve(object_id).await {
469 GetResult::Node(_) => panic!("Unexpected node"),
470 GetResult::Placeholder(_) => {}
471 };
472 }
473
474 #[fuchsia::test(threads = 10)]
475 async fn test_subsequent_callers_block() {
476 let cache = Arc::new(NodeCache::new());
477 let object_id = 0u64;
478 let writes_to_cache = Arc::new(AtomicU64::new(0));
479 let reads_from_cache = Arc::new(AtomicU64::new(0));
480 let node = Arc::new(FakeNode(object_id, cache.clone()));
481 join_all((0..10).map(|_| {
482 let node = node.clone();
483 let cache = cache.clone();
484 let object_id = object_id.clone();
485 let writes_to_cache = writes_to_cache.clone();
486 let reads_from_cache = reads_from_cache.clone();
487 async move {
488 match cache.get_or_reserve(object_id).await {
489 GetResult::Node(node) => {
490 reads_from_cache.fetch_add(1, Ordering::SeqCst);
491 assert_eq!(node.object_id(), object_id);
492 }
493 GetResult::Placeholder(p) => {
494 writes_to_cache.fetch_add(1, Ordering::SeqCst);
495 fasync::Timer::new(Duration::from_millis(100)).await;
497 p.commit(&(node as Arc<dyn FxNode>));
498 }
499 }
500 }
501 }))
502 .await;
503 assert_eq!(writes_to_cache.load(Ordering::SeqCst), 1);
504 assert_eq!(reads_from_cache.load(Ordering::SeqCst), 9);
505 }
506
507 #[fuchsia::test(threads = 10)]
508 async fn test_multiple_nodes() {
509 const NUM_OBJECTS: usize = 5;
510 const TASKS_PER_OBJECT: usize = 4;
511
512 let cache = Arc::new(NodeCache::new());
513 let writes = Arc::new(Mutex::new(vec![0u64; NUM_OBJECTS]));
514 let reads = Arc::new(Mutex::new(vec![0u64; NUM_OBJECTS]));
515 let nodes: Vec<_> = (0..NUM_OBJECTS as u64)
516 .map(|object_id| Arc::new(FakeNode(object_id, cache.clone())))
517 .collect();
518
519 join_all((0..TASKS_PER_OBJECT).flat_map(|_| {
520 nodes.iter().cloned().map(|node| {
521 let cache = cache.clone();
522 let writes = writes.clone();
523 let reads = reads.clone();
524 async move {
525 match cache.get_or_reserve(node.object_id()).await {
526 GetResult::Node(result) => {
527 assert_eq!(node.object_id(), result.object_id());
528 reads.lock()[node.object_id() as usize] += 1;
529 }
530 GetResult::Placeholder(p) => {
531 writes.lock()[node.object_id() as usize] += 1;
532 fasync::Timer::new(Duration::from_millis(100)).await;
534 p.commit(&(node as Arc<dyn FxNode>));
535 }
536 }
537 }
538 })
539 }))
540 .await;
541 assert_eq!(*writes.lock(), vec![1u64; NUM_OBJECTS]);
542 assert_eq!(*reads.lock(), vec![TASKS_PER_OBJECT as u64 - 1; NUM_OBJECTS]);
543 }
544}