fxfs_platform/fuchsia/
node.rs1use crate::fuchsia::directory::FxDirectory;
6use crate::fuchsia::file::FxFile;
7use fuchsia_sync::Mutex;
8use futures::future::poll_fn;
9use fxfs::object_store::ObjectDescriptor;
10use fxfs_macros::ToWeakNode;
11use std::any::TypeId;
12use std::collections::BTreeMap;
13use std::collections::btree_map::Entry;
14use std::fmt;
15use std::mem::ManuallyDrop;
16use std::sync::{Arc, Weak};
17use std::task::{Poll, Waker};
18use vfs::common::IntoAny;
19
20pub trait FxNode: IntoAny + ToWeakNode + Send + Sync + 'static {
22 fn object_id(&self) -> u64;
23 fn parent(&self) -> Option<Arc<FxDirectory>>;
24 fn set_parent(&self, parent: Arc<FxDirectory>);
25 fn open_count_add_one(&self);
26
27 fn open_count_sub_one(self: Arc<Self>);
30 fn object_descriptor(&self) -> ObjectDescriptor;
31
32 fn mark_to_be_purged(&self) {
34 panic!("Unexpected call to mark_to_be_purged");
35 }
36
37 fn terminate(&self) {}
40}
41
42struct PlaceholderInner {
43 object_id: u64,
44 waker_sequence: u64,
45 wakers: Vec<Waker>,
46}
47
48#[derive(ToWeakNode)]
49struct Placeholder(Mutex<PlaceholderInner>);
50
51impl FxNode for Placeholder {
52 fn object_id(&self) -> u64 {
53 self.0.lock().object_id
54 }
55 fn parent(&self) -> Option<Arc<FxDirectory>> {
56 unreachable!();
57 }
58 fn set_parent(&self, _parent: Arc<FxDirectory>) {
59 unreachable!();
60 }
61 fn open_count_add_one(&self) {}
62 fn open_count_sub_one(self: Arc<Self>) {}
63
64 fn object_descriptor(&self) -> ObjectDescriptor {
65 ObjectDescriptor::File
66 }
67}
68
69impl fmt::Debug for dyn FxNode {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 f.debug_struct("FxNode")
72 .field("id", &self.object_id())
73 .field("descriptor", &self.object_descriptor())
74 .finish()
75 }
76}
77
78pub struct PlaceholderOwner<'a> {
80 inner: Arc<Placeholder>,
81 committed: bool,
82 cache: &'a NodeCache,
83}
84
85impl PlaceholderOwner<'_> {
86 pub fn commit(mut self, node: &Arc<dyn FxNode>) {
88 let this_object_id = self.inner.object_id();
89 assert_eq!(node.object_id(), this_object_id);
90 self.committed = true;
91 self.cache.commit(node);
92 }
93}
94
95impl Drop for PlaceholderOwner<'_> {
96 fn drop(&mut self) {
97 let mut p = self.inner.0.lock();
98 if !self.committed {
99 self.cache.0.lock().map.remove(&p.object_id);
102 }
103 for waker in p.wakers.drain(..) {
104 waker.wake();
105 }
106 }
107}
108
109pub enum GetResult<'a> {
111 Placeholder(PlaceholderOwner<'a>),
112 Node(Arc<dyn FxNode>),
113}
114
115impl<'a> GetResult<'a> {
116 pub fn placeholder(self) -> Option<PlaceholderOwner<'a>> {
117 match self {
118 GetResult::Placeholder(placeholder) => Some(placeholder),
119 _ => None,
120 }
121 }
122}
123
124pub struct WeakNode {
129 vtable: &'static WeakNodeVTable,
130 node: *const (),
131}
132
133impl WeakNode {
134 pub(crate) unsafe fn new(vtable: &'static WeakNodeVTable, node: *const ()) -> Self {
137 Self { vtable, node }
138 }
139
140 unsafe fn is_type<T: 'static>(&self) -> bool {
141 unsafe { (self.vtable.type_id)() == TypeId::of::<T>() }
142 }
143}
144
145impl Drop for WeakNode {
146 fn drop(&mut self) {
147 unsafe {
151 (self.vtable.drop)(self.node);
152 }
153 }
154}
155
156unsafe impl Send for WeakNode {}
158
159pub(crate) struct WeakNodeVTable {
160 drop: unsafe fn(*const ()),
162
163 type_id: unsafe fn() -> TypeId,
165
166 upgrade: unsafe fn(*const ()) -> Option<Arc<dyn FxNode>>,
168}
169
170impl WeakNodeVTable {
171 pub(crate) const fn new(
172 drop: unsafe fn(*const ()),
173 type_id: unsafe fn() -> TypeId,
174 upgrade: unsafe fn(*const ()) -> Option<Arc<dyn FxNode>>,
175 ) -> Self {
176 Self { drop, type_id, upgrade }
177 }
178}
179
180pub trait ToWeakNode {
183 fn to_weak_node(self: Arc<Self>) -> WeakNode;
184}
185
186fn upgrade_and_downcast_node<T: 'static>(weak_node: &WeakNode) -> Option<Arc<T>> {
189 unsafe {
192 if weak_node.is_type::<T>() {
193 ManuallyDrop::new(Weak::from_raw(weak_node.node as *const T)).upgrade()
194 } else {
195 None
196 }
197 }
198}
199
200fn upgrade_node(weak_node: &WeakNode) -> Option<Arc<dyn FxNode>> {
202 unsafe { (weak_node.vtable.upgrade)(weak_node.node) }
205}
206
207struct NodeCacheInner {
208 map: BTreeMap<u64, WeakNode>,
209 next_waker_sequence: u64,
210}
211
212pub struct NodeCache(Mutex<NodeCacheInner>);
214
215pub struct FileIter<'a> {
217 cache: &'a NodeCache,
218 object_id: Option<u64>,
219}
220
221impl<'a> Iterator for FileIter<'a> {
222 type Item = Arc<FxFile>;
223 fn next(&mut self) -> Option<Self::Item> {
224 let cache = self.cache.0.lock();
225 let range = match self.object_id {
226 None => cache.map.range(0..),
227 Some(oid) => cache.map.range(oid + 1..),
228 };
229 for (object_id, node) in range {
230 if let Some(file) = upgrade_and_downcast_node(node) {
231 self.object_id = Some(*object_id);
232 return Some(file);
233 }
234 }
235 None
236 }
237}
238
239impl NodeCache {
240 pub fn new() -> Self {
241 Self(Mutex::new(NodeCacheInner { map: BTreeMap::new(), next_waker_sequence: 0 }))
242 }
243
244 pub async fn get_or_reserve<'a>(&'a self, object_id: u64) -> GetResult<'a> {
251 let mut waker_sequence = 0;
252 let mut waker_index = 0;
253 poll_fn(|cx| {
254 let mut this = self.0.lock();
255 if let Some(node) = this.map.get(&object_id) {
256 if let Some(node) = upgrade_node(node) {
257 if let Ok(placeholder) = node.clone().into_any().downcast::<Placeholder>() {
258 let mut inner = placeholder.0.lock();
259 if inner.waker_sequence == waker_sequence {
260 inner.wakers[waker_index] = cx.waker().clone();
261 } else {
262 waker_index = inner.wakers.len();
263 waker_sequence = inner.waker_sequence;
264 inner.wakers.push(cx.waker().clone());
265 }
266 return Poll::Pending;
267 } else {
268 return Poll::Ready(GetResult::Node(node));
269 }
270 }
271 }
272 this.next_waker_sequence += 1;
273 let inner = Arc::new(Placeholder(Mutex::new(PlaceholderInner {
274 object_id,
275 waker_sequence: this.next_waker_sequence,
276 wakers: vec![],
277 })));
278 this.map.insert(object_id, inner.clone().to_weak_node());
279 Poll::Ready(GetResult::Placeholder(PlaceholderOwner {
280 inner,
281 committed: false,
282 cache: self,
283 }))
284 })
285 .await
286 }
287
288 pub fn remove(&self, node: &dyn FxNode) {
291 let mut this = self.0.lock();
292 if let Entry::Occupied(o) = this.map.entry(node.object_id()) {
293 if o.get().node == node as *const dyn FxNode as *const () {
301 o.remove();
302 }
303 }
304 }
305
306 pub fn get(&self, object_id: u64) -> Option<Arc<dyn FxNode>> {
311 let l = self.0.lock();
312 let weak_node = l.map.get(&object_id)?;
313 assert!(
317 unsafe { !weak_node.is_type::<Placeholder>() },
318 "Returning a placeholder indicates a race with open"
319 );
320 upgrade_node(weak_node)
321 }
322
323 pub fn files(&self) -> FileIter<'_> {
325 FileIter { cache: self, object_id: None }
326 }
327
328 pub fn terminate(&self) {
329 let _drop_list = {
330 let this = self.0.lock();
331 let mut drop_list = Vec::with_capacity(this.map.len());
332 for (_, node) in &this.map {
333 if let Some(node) = upgrade_node(&node) {
334 node.terminate();
335 drop_list.push(node);
337 }
338 }
339 drop_list
340 };
341 }
342
343 fn commit(&self, node: &Arc<dyn FxNode>) {
344 let mut this = self.0.lock();
345 this.map.insert(node.object_id(), node.clone().to_weak_node());
346 }
347}
348
349pub struct OpenedNode<N: FxNode + ?Sized>(pub Arc<N>);
351
352impl<N: FxNode + ?Sized> OpenedNode<N> {
353 pub fn new(node: Arc<N>) -> Self {
354 node.open_count_add_one();
355 OpenedNode(node)
356 }
357
358 pub fn downcast<T: FxNode>(self) -> Result<OpenedNode<T>, Self> {
360 if self.is::<T>() {
361 Ok(OpenedNode(
362 self.take().into_any().downcast::<T>().unwrap_or_else(|_| unreachable!()),
363 ))
364 } else {
365 Err(self)
366 }
367 }
368
369 pub fn take(self) -> Arc<N> {
371 let this = std::mem::ManuallyDrop::new(self);
372 unsafe { std::ptr::read(&this.0) }
373 }
374
375 pub fn is<T: 'static>(&self) -> bool {
377 self.0.as_ref().type_id() == TypeId::of::<T>()
378 }
379}
380
381impl<N: FxNode + ?Sized> Drop for OpenedNode<N> {
382 fn drop(&mut self) {
383 self.0.clone().open_count_sub_one();
384 }
385}
386
387impl<N: FxNode + ?Sized> std::ops::Deref for OpenedNode<N> {
388 type Target = Arc<N>;
389
390 fn deref(&self) -> &Self::Target {
391 &self.0
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use crate::fuchsia::directory::FxDirectory;
398 use crate::fuchsia::node::{FxNode, GetResult, NodeCache};
399 use fuchsia_async as fasync;
400 use fuchsia_sync::Mutex;
401 use futures::future::join_all;
402 use fxfs::object_store::ObjectDescriptor;
403 use fxfs_macros::ToWeakNode;
404 use std::sync::Arc;
405 use std::sync::atomic::{AtomicU64, Ordering};
406 use std::time::Duration;
407
408 #[derive(ToWeakNode)]
409 struct FakeNode(u64, Arc<NodeCache>);
410 impl FxNode for FakeNode {
411 fn object_id(&self) -> u64 {
412 self.0
413 }
414 fn parent(&self) -> Option<Arc<FxDirectory>> {
415 unreachable!();
416 }
417 fn set_parent(&self, _parent: Arc<FxDirectory>) {
418 unreachable!();
419 }
420 fn open_count_add_one(&self) {}
421 fn open_count_sub_one(self: Arc<Self>) {}
422
423 fn object_descriptor(&self) -> ObjectDescriptor {
424 ObjectDescriptor::Directory
425 }
426 }
427 impl Drop for FakeNode {
428 fn drop(&mut self) {
429 self.1.remove(self);
430 }
431 }
432
433 #[fuchsia::test]
434 async fn test_drop_placeholder() {
435 let cache = Arc::new(NodeCache::new());
436 let object_id = 0u64;
437 match cache.get_or_reserve(object_id).await {
438 GetResult::Node(_) => panic!("Unexpected node"),
439 GetResult::Placeholder(_) => {}
440 };
441 match cache.get_or_reserve(object_id).await {
442 GetResult::Node(_) => panic!("Unexpected node"),
443 GetResult::Placeholder(_) => {}
444 };
445 }
446
447 #[fuchsia::test]
448 async fn test_simple() {
449 let cache = Arc::new(NodeCache::new());
450 let object_id = {
451 let node = Arc::new(FakeNode(0, cache.clone()));
452 match cache.get_or_reserve(node.object_id()).await {
453 GetResult::Node(_) => panic!("Unexpected node"),
454 GetResult::Placeholder(p) => {
455 p.commit(&(node.clone() as Arc<dyn FxNode>));
456 }
457 };
458 match cache.get_or_reserve(node.object_id()).await {
459 GetResult::Node(n) => assert_eq!(n.object_id(), node.object_id()),
460 GetResult::Placeholder(_) => panic!("No node found"),
461 };
462 node.object_id()
463 };
464 match cache.get_or_reserve(object_id).await {
465 GetResult::Node(_) => panic!("Unexpected node"),
466 GetResult::Placeholder(_) => {}
467 };
468 }
469
470 #[fuchsia::test(threads = 10)]
471 async fn test_subsequent_callers_block() {
472 let cache = Arc::new(NodeCache::new());
473 let object_id = 0u64;
474 let writes_to_cache = Arc::new(AtomicU64::new(0));
475 let reads_from_cache = Arc::new(AtomicU64::new(0));
476 let node = Arc::new(FakeNode(object_id, cache.clone()));
477 join_all((0..10).map(|_| {
478 let node = node.clone();
479 let cache = cache.clone();
480 let object_id = object_id.clone();
481 let writes_to_cache = writes_to_cache.clone();
482 let reads_from_cache = reads_from_cache.clone();
483 async move {
484 match cache.get_or_reserve(object_id).await {
485 GetResult::Node(node) => {
486 reads_from_cache.fetch_add(1, Ordering::SeqCst);
487 assert_eq!(node.object_id(), object_id);
488 }
489 GetResult::Placeholder(p) => {
490 writes_to_cache.fetch_add(1, Ordering::SeqCst);
491 fasync::Timer::new(Duration::from_millis(100)).await;
493 p.commit(&(node as Arc<dyn FxNode>));
494 }
495 }
496 }
497 }))
498 .await;
499 assert_eq!(writes_to_cache.load(Ordering::SeqCst), 1);
500 assert_eq!(reads_from_cache.load(Ordering::SeqCst), 9);
501 }
502
503 #[fuchsia::test(threads = 10)]
504 async fn test_multiple_nodes() {
505 const NUM_OBJECTS: usize = 5;
506 const TASKS_PER_OBJECT: usize = 4;
507
508 let cache = Arc::new(NodeCache::new());
509 let writes = Arc::new(Mutex::new(vec![0u64; NUM_OBJECTS]));
510 let reads = Arc::new(Mutex::new(vec![0u64; NUM_OBJECTS]));
511 let nodes: Vec<_> = (0..NUM_OBJECTS as u64)
512 .map(|object_id| Arc::new(FakeNode(object_id, cache.clone())))
513 .collect();
514
515 join_all((0..TASKS_PER_OBJECT).flat_map(|_| {
516 nodes.iter().cloned().map(|node| {
517 let cache = cache.clone();
518 let writes = writes.clone();
519 let reads = reads.clone();
520 async move {
521 match cache.get_or_reserve(node.object_id()).await {
522 GetResult::Node(result) => {
523 assert_eq!(node.object_id(), result.object_id());
524 reads.lock()[node.object_id() as usize] += 1;
525 }
526 GetResult::Placeholder(p) => {
527 writes.lock()[node.object_id() as usize] += 1;
528 fasync::Timer::new(Duration::from_millis(100)).await;
530 p.commit(&(node as Arc<dyn FxNode>));
531 }
532 }
533 }
534 })
535 }))
536 .await;
537 assert_eq!(*writes.lock(), vec![1u64; NUM_OBJECTS]);
538 assert_eq!(*reads.lock(), vec![TASKS_PER_OBJECT as u64 - 1; NUM_OBJECTS]);
539 }
540}