fxfs/object_store/
key_manager.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::FxfsError;
6use crate::log::*;
7use crate::object_store::{FSCRYPT_KEY_ID, VOLUME_DATA_KEY_ID};
8use anyhow::Error;
9use event_listener::Event;
10use fuchsia_sync::Mutex;
11use fxfs_crypto::{CipherSet, Crypt, FindKeyResult, Key, UnwrappedKeys, WrappedKeys};
12use scopeguard::ScopeGuard;
13use std::cell::UnsafeCell;
14use std::collections::btree_map::Entry;
15use std::collections::BTreeMap;
16use std::future::Future;
17use std::sync::Arc;
18use std::time::Duration;
19use {fuchsia_async as fasync, zx_status as zx};
20
21/// This timeout controls when entries are moved from `hash` to `pending_purge` and then dumped.
22/// Entries will remain in the cache until they remain inactive from between PURGE_TIMEOUT and 2 *
23/// PURGE_TIMEOUT.  This is deliberately set to 37 seconds (rather than a round number) to reduce
24/// the chance that this timer ends up always firing at the same time as other timers.
25const PURGE_TIMEOUT: Duration = Duration::from_secs(37);
26
27/// A simple cache that purges entries periodically when `purge` is called.  The API is similar to
28/// HashMap's.
29struct Cache<V> {
30    hash: BTreeMap<u64, V>,
31    pending_purge: BTreeMap<u64, V>,
32    permanent: BTreeMap<u64, V>,
33}
34
35impl<V> Cache<V> {
36    fn new() -> Self {
37        Self { hash: BTreeMap::new(), pending_purge: BTreeMap::new(), permanent: BTreeMap::new() }
38    }
39
40    fn get(&mut self, key: u64) -> Option<&V> {
41        match self.hash.entry(key) {
42            Entry::Occupied(o) => Some(o.into_mut()),
43            Entry::Vacant(v) => {
44                // If we find an entry in `pending_purge`, move it into `hash`.
45                if let Some(value) = self.pending_purge.remove(&key) {
46                    Some(v.insert(value))
47                } else {
48                    self.permanent.get(&key)
49                }
50            }
51        }
52    }
53
54    fn insert(&mut self, key: u64, value: V, permanent: bool) {
55        if permanent {
56            self.permanent.insert(key, value);
57        } else {
58            self.hash.insert(key, value);
59        }
60    }
61
62    fn merge(&mut self, key: u64, merge: impl FnOnce(Option<&V>) -> V) {
63        match self.hash.entry(key) {
64            Entry::Occupied(mut o) => {
65                let new = merge(Some(o.get()));
66                *o.get_mut() = new;
67            }
68            Entry::Vacant(v) => {
69                // If we find an entry in `pending_purge`, move it into `hash`.
70                if let Some(value) = self.pending_purge.remove(&key) {
71                    v.insert(merge(Some(&value)));
72                } else {
73                    v.insert(merge(None));
74                }
75            }
76        }
77    }
78
79    /// This purges entries that haven't been accessed since the last call to purge.
80    ///
81    /// Returns true if the cache no longer has any purgeable entries.
82    fn purge(&mut self) -> bool {
83        self.pending_purge = std::mem::take(&mut self.hash);
84        self.pending_purge.is_empty()
85    }
86
87    fn clear(&mut self) {
88        self.hash.clear();
89        self.pending_purge.clear();
90        self.permanent.clear();
91    }
92
93    fn remove(&mut self, key: u64) {
94        self.hash.remove(&key);
95        self.pending_purge.remove(&key);
96        self.permanent.remove(&key);
97    }
98}
99
100pub struct KeyManager {
101    inner: Arc<Mutex<Inner>>,
102}
103
104struct Inner {
105    keys: Cache<Arc<CipherSet>>,
106    unwrapping: BTreeMap<u64, Arc<UnwrapResult>>,
107    purge_task: Option<fasync::Task<()>>,
108}
109
110impl Inner {
111    fn start_purge_task(&mut self, inner: &Arc<Mutex<Inner>>) {
112        self.purge_task.get_or_insert_with(move || {
113            let inner = inner.clone();
114            fasync::Task::spawn(async move {
115                loop {
116                    fasync::Timer::new(PURGE_TIMEOUT).await;
117                    let mut inner = inner.lock();
118                    if inner.keys.purge() {
119                        inner.purge_task = None;
120                        break;
121                    }
122                }
123            })
124        });
125    }
126}
127
128struct UnwrapResult {
129    event: Event,
130    error: UnsafeCell<zx::Status>,
131    // Protected by the mutex on Inner.
132    cancelled: UnsafeCell<bool>,
133}
134
135impl UnwrapResult {
136    fn new() -> Arc<Self> {
137        Arc::new(UnwrapResult {
138            event: Event::new(),
139            error: UnsafeCell::new(zx::Status::OK),
140            cancelled: UnsafeCell::new(false),
141        })
142    }
143
144    /// Returns true if cancelled.
145    fn set(
146        &self,
147        inner: &Arc<Mutex<Inner>>,
148        object_id: u64,
149        permanent: bool,
150        result: Result<Option<Arc<CipherSet>>, zx::Status>,
151    ) -> bool {
152        let mut guard = inner.lock();
153        // SAFETY: Safe because we hold the lock on `inner`.
154        let cancelled = unsafe { *self.cancelled.get() };
155        let set_error = |error| {
156            // SAFETY: This is safe because we have exclusive access until we call notify below.
157            unsafe {
158                *self.error.get() = error;
159            }
160        };
161        if cancelled {
162            set_error(zx::Status::CANCELED);
163        } else if let Err(error) = &result {
164            error!(error:?, oid = object_id; "Failed to unwrap keys");
165            set_error(*error);
166        }
167        if let Entry::Occupied(o) = guard.unwrapping.entry(object_id) {
168            if std::ptr::eq(Arc::as_ptr(o.get()), self) {
169                o.remove();
170                if !cancelled {
171                    if let Ok(Some(keys)) = &result {
172                        guard.keys.insert(object_id, keys.clone(), permanent);
173                        guard.start_purge_task(inner);
174                    }
175                }
176            }
177        }
178        self.event.notify(usize::MAX);
179        cancelled
180    }
181}
182
183unsafe impl Send for UnwrapResult {}
184unsafe impl Sync for UnwrapResult {}
185
186impl KeyManager {
187    pub fn new() -> Self {
188        let inner = Arc::new(Mutex::new(Inner {
189            keys: Cache::new(),
190            unwrapping: BTreeMap::new(),
191            purge_task: None,
192        }));
193
194        Self { inner }
195    }
196
197    /// Retrieves the key with id VOLUME_DATA_KEY_ID from the cache but won't initiate unwrapping if
198    /// no key is present.  If the key is currently in the process of being unwrapped, this will
199    /// wait until that has finished.  This should be used with permanent keys.  This will return
200    /// None if the key isn't present in the cache, but can also return None if they key isn't
201    /// present in the set of keys.
202    pub async fn get(&self, object_id: u64) -> Result<Option<Key>, Error> {
203        loop {
204            let (unwrap_result, listener) = {
205                let mut inner = self.inner.lock();
206
207                if let Some(keys) = inner.keys.get(object_id) {
208                    return match keys.find_key(VOLUME_DATA_KEY_ID) {
209                        FindKeyResult::NotFound => Ok(None),
210                        FindKeyResult::Unavailable => Err(FxfsError::NoKey.into()),
211                        FindKeyResult::Key(key) => Ok(Some(key)),
212                    };
213                }
214                let unwrap_result = match inner.unwrapping.entry(object_id) {
215                    Entry::Vacant(_) => return Ok(None),
216                    Entry::Occupied(o) => o.get().clone(),
217                };
218                let listener = unwrap_result.event.listen();
219                (unwrap_result, listener)
220            };
221            listener.await;
222            // SAFETY: This is safe because there can be no mutations happening at this point.
223            let error = unsafe { *unwrap_result.error.get().clone() };
224            match error {
225                zx::Status::OK => {}
226                _ => return Err(error.into()),
227            }
228        }
229    }
230
231    /// This retrieves keys from the cache or initiates unwrapping if they are not in the cache.  If
232    /// `force` is true, then this will always attempt to unwrap the keys again, even if the keys
233    /// are present in the cache.  `wrapped_keys` is a future to be used to retrieve the wrapped
234    /// keys.  It is passed in an `Option` so that callers can tell if the keys were freshly
235    /// retrieved.
236    pub async fn get_keys(
237        &self,
238        object_id: u64,
239        crypt: &dyn Crypt,
240        wrapped_keys: &mut Option<impl AsyncFnOnce() -> Result<WrappedKeys, Error>>,
241        permanent: bool,
242        force: bool,
243    ) -> Result<Arc<CipherSet>, Error> {
244        let inner = self.inner.clone();
245        let mut unwrap_result;
246
247        loop {
248            let listener = {
249                let mut inner = inner.lock();
250
251                if !force {
252                    if let Some(keys) = inner.keys.get(object_id) {
253                        return Ok(keys.clone());
254                    }
255                }
256
257                match inner.unwrapping.entry(object_id) {
258                    Entry::Vacant(v) => {
259                        unwrap_result = UnwrapResult::new();
260                        v.insert(unwrap_result.clone());
261                        break;
262                    }
263                    Entry::Occupied(o) => {
264                        unwrap_result = o.get().clone();
265                        let listener = unwrap_result.event.listen();
266                        listener
267                    }
268                }
269            };
270
271            listener.await;
272            // SAFETY: This is safe because there can be no mutations happening at this
273            // point.
274            let error = unsafe { *unwrap_result.error.get().clone() };
275            match error {
276                zx::Status::OK => {}
277                _ => return Err(error.into()),
278            }
279        }
280
281        // Use a guard in case we're dropped.
282        let mut result = scopeguard::guard(Ok(None), |result| {
283            unwrap_result.set(&inner, object_id, permanent, result);
284        });
285
286        let wrapped_keys = match wrapped_keys.take().unwrap()().await {
287            Ok(wrapped_keys) => wrapped_keys,
288            Err(error) => {
289                error!(error:?; "Failed to get wrapped keys");
290                *result = Err(zx::Status::INTERNAL);
291                return Err(zx::Status::INTERNAL.into());
292            }
293        };
294        match crypt.unwrap_keys(&wrapped_keys, object_id).await {
295            Ok(unwrapped_keys) => {
296                let keys = unwrapped_keys.to_cipher_set();
297                let _ = ScopeGuard::into_inner(result);
298                if unwrap_result.set(&inner, object_id, permanent, Ok(Some(keys.clone()))) {
299                    Err(zx::Status::CANCELED.into())
300                } else {
301                    Ok(keys)
302                }
303            }
304            Err(error) => {
305                *result = Err(error);
306                Err(error.into())
307            }
308        }
309    }
310
311    /// Returns the key specified by `key_id`, or None if it isn't present. If the key specified by
312    /// `key_id` cannot be unwrapped, this will return FxfsError::NoKey.
313    pub async fn get_key(
314        &self,
315        object_id: u64,
316        crypt: &dyn Crypt,
317        wrapped_keys: impl AsyncFnOnce() -> Result<WrappedKeys, Error>,
318        key_id: u64,
319    ) -> Result<Option<Key>, Error> {
320        let mut wrapped_keys = Some(wrapped_keys);
321        let mut force = false;
322        loop {
323            let keys = self
324                .get_keys(object_id, crypt, &mut wrapped_keys, /* permanent= */ false, force)
325                .await?;
326            return match keys.find_key(key_id) {
327                FindKeyResult::NotFound => Ok(None),
328                FindKeyResult::Unavailable => {
329                    if force || wrapped_keys.is_none() {
330                        Err(FxfsError::NoKey.into())
331                    } else {
332                        force = true;
333                        continue;
334                    }
335                }
336                FindKeyResult::Key(k) => Ok(Some(k)),
337            };
338        }
339    }
340
341    /// For files, the only way we can tell whether it has an fscrypt encryption key is if there's a
342    /// key with id FSCRYPT_KEY_ID.  This function will return that key if it is present, but will
343    /// otherwise fall back to the the key with id VOLUME_DATA_KEY_ID.  If the fscrypt encryption
344    /// key cannot be unwrapped, this will return FxfsError::NoKey.
345    pub async fn get_fscrypt_key_if_present(
346        &self,
347        object_id: u64,
348        crypt: &dyn Crypt,
349        wrapped_keys: impl AsyncFnOnce() -> Result<WrappedKeys, Error>,
350    ) -> Result<Key, Error> {
351        let mut wrapped_keys = Some(wrapped_keys);
352        let mut force = false;
353        loop {
354            let keys = self
355                .get_keys(object_id, crypt, &mut wrapped_keys, /* permanent= */ false, force)
356                .await?;
357            return match keys.find_key(FSCRYPT_KEY_ID) {
358                FindKeyResult::NotFound => Ok(to_result(keys.find_key(VOLUME_DATA_KEY_ID))?),
359                FindKeyResult::Unavailable => {
360                    if force || wrapped_keys.is_none() {
361                        Err(FxfsError::NoKey.into())
362                    } else {
363                        force = true;
364                        continue;
365                    }
366                }
367                FindKeyResult::Key(k) => Ok(k),
368            };
369        }
370    }
371
372    /// This function is for directories which know whether they should be using an fscrypt
373    /// encryption key, and can tolerate the key being unavailable.  This returns None if
374    /// the key is currently unavailable.
375    pub async fn get_fscrypt_key(
376        &self,
377        object_id: u64,
378        crypt: &dyn Crypt,
379        wrapped_keys: impl AsyncFnOnce() -> Result<WrappedKeys, Error>,
380    ) -> Result<Option<Key>, Error> {
381        let mut wrapped_keys = Some(wrapped_keys);
382        let mut force = false;
383        loop {
384            let keys = self
385                .get_keys(object_id, crypt, &mut wrapped_keys, /* permanent= */ false, force)
386                .await?;
387            return match keys.find_key(FSCRYPT_KEY_ID) {
388                FindKeyResult::NotFound => Err(FxfsError::NotFound.into()),
389                FindKeyResult::Unavailable => {
390                    if force || wrapped_keys.is_none() {
391                        Ok(None)
392                    } else {
393                        force = true;
394                        continue;
395                    }
396                }
397                FindKeyResult::Key(k) => Ok(Some(k)),
398            };
399        }
400    }
401
402    /// This inserts the keys into the cache.  Any existing keys will be overwritten.  It's
403    /// unspecified what happens if keys for the object are currently being unwrapped.
404    pub fn insert(&self, object_id: u64, keys: impl ToCipherSet, permanent: bool) {
405        let mut inner = self.inner.lock();
406        inner.keys.insert(object_id, keys.to_cipher_set(), permanent);
407        inner.start_purge_task(&self.inner);
408    }
409
410    /// This merges into the cache.  `merge` is a callback that receives the existing keys, if any,
411    /// as an argument.  It's unspecified what happens if keys for the object are currently being
412    /// unwrapped.
413    pub fn merge(
414        &self,
415        object_id: u64,
416        merge: impl FnOnce(Option<&Arc<CipherSet>>) -> Arc<CipherSet>,
417    ) {
418        let mut inner = self.inner.lock();
419        inner.keys.merge(object_id, merge);
420        inner.start_purge_task(&self.inner);
421    }
422
423    /// Removes keys.  Returns a future that can (optionally) be awaited after which any
424    /// task that might be running to fetch keys will have finished.
425    pub fn remove(&self, object_id: u64) -> impl Future<Output = ()> {
426        {
427            let mut inner = self.inner.lock();
428            inner.keys.remove(object_id);
429            if let Some(u) = inner.unwrapping.get(&object_id) {
430                // SAFETY: Safe because of lock on `inner`.
431                unsafe { *u.cancelled.get() = true };
432            }
433        }
434        let inner = self.inner.clone();
435        async move {
436            let listener = {
437                if let Some(u) = inner.lock().unwrapping.get(&object_id) {
438                    u.event.listen()
439                } else {
440                    return;
441                }
442            };
443            listener.await;
444        }
445    }
446
447    /// This clears the caches of all keys.
448    pub fn clear(&self) {
449        let mut inner = self.inner.lock();
450        inner.keys.clear();
451        inner.unwrapping.clear();
452    }
453}
454
455pub trait ToCipherSet {
456    fn to_cipher_set(self) -> Arc<CipherSet>;
457}
458
459impl ToCipherSet for &UnwrappedKeys {
460    fn to_cipher_set(self) -> Arc<CipherSet> {
461        Arc::new(CipherSet::new(self))
462    }
463}
464
465impl ToCipherSet for Arc<CipherSet> {
466    fn to_cipher_set(self) -> Arc<CipherSet> {
467        self
468    }
469}
470
471fn to_result(find_key_result: FindKeyResult) -> Result<Key, FxfsError> {
472    match find_key_result {
473        FindKeyResult::NotFound => Err(FxfsError::NotFound),
474        FindKeyResult::Unavailable => Err(FxfsError::NoKey),
475        FindKeyResult::Key(k) => Ok(k),
476    }
477}
478
479#[cfg(target_os = "fuchsia")]
480#[cfg(test)]
481mod tests {
482    use super::{to_result, KeyManager, PURGE_TIMEOUT};
483    use crate::log::*;
484    use async_trait::async_trait;
485    use fuchsia_async::{self as fasync, MonotonicInstant, TestExecutor};
486
487    use futures::channel::oneshot;
488    use futures::join;
489    use fxfs_crypto::{
490        CipherSet, Crypt, KeyPurpose, UnwrappedKey, WrappedKey, WrappedKeyBytes, WrappedKeys,
491        KEY_SIZE, WRAPPED_KEY_SIZE,
492    };
493    use std::future::pending;
494    use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
495    use std::sync::Arc;
496
497    const PLAIN_TEXT: &[u8] = b"The quick brown fox jumps over the lazy dog";
498    const ERROR_COUNTER: u8 = 0xff;
499
500    fn unwrapped_key(counter: u8) -> UnwrappedKey {
501        UnwrappedKey::new(vec![counter; KEY_SIZE].try_into().unwrap())
502    }
503
504    fn cipher_text(counter: u8) -> Vec<u8> {
505        let mut text = PLAIN_TEXT.to_vec();
506        to_result(Arc::new(CipherSet::new(&vec![(0, Some(unwrapped_key(counter)))])).find_key(0))
507            .unwrap()
508            .encrypt(0, &mut text)
509            .expect("encrypt failed");
510        text
511    }
512
513    fn wrapped_keys() -> WrappedKeys {
514        WrappedKeys::from(vec![(
515            0,
516            WrappedKey {
517                wrapping_key_id: 0x1234567812345678,
518                key: WrappedKeyBytes::from([0xff; WRAPPED_KEY_SIZE]),
519            },
520        )])
521    }
522
523    struct TestCrypt {
524        counter: AtomicU8,
525        unwrap_delay: std::time::Duration,
526    }
527
528    impl TestCrypt {
529        fn new(counter: u8) -> Arc<Self> {
530            Arc::new(Self {
531                counter: AtomicU8::new(counter),
532                unwrap_delay: std::time::Duration::from_secs(1),
533            })
534        }
535
536        fn with_unwrap_delay(counter: u8, unwrap_delay: std::time::Duration) -> Arc<Self> {
537            Arc::new(Self { counter: AtomicU8::new(counter), unwrap_delay })
538        }
539    }
540
541    #[async_trait]
542    impl Crypt for TestCrypt {
543        async fn create_key(
544            &self,
545            _owner: u64,
546            _purpose: KeyPurpose,
547        ) -> Result<(WrappedKey, UnwrappedKey), zx::Status> {
548            unimplemented!("Not used in tests");
549        }
550
551        async fn create_key_with_id(
552            &self,
553            _owner: u64,
554            _wrapping_key_id: u128,
555        ) -> Result<(WrappedKey, UnwrappedKey), zx::Status> {
556            unimplemented!("Not used in tests");
557        }
558
559        async fn unwrap_key(
560            &self,
561            _wrapped_key: &WrappedKey,
562            _owner: u64,
563        ) -> Result<UnwrappedKey, zx::Status> {
564            if !self.unwrap_delay.is_zero() {
565                fasync::Timer::new(self.unwrap_delay).await;
566            }
567            let counter = self.counter.fetch_add(1, Ordering::Relaxed);
568            if counter == ERROR_COUNTER {
569                error!("Unwrap failed!");
570                Err(zx::Status::INTERNAL)
571            } else {
572                Ok(unwrapped_key(counter))
573            }
574        }
575    }
576
577    #[fuchsia::test(allow_stalls = false)]
578    async fn test_get_keys() {
579        TestExecutor::advance_to(MonotonicInstant::from_nanos(0)).await;
580
581        let crypt = TestCrypt::new(0);
582        let manager1 = Arc::new(KeyManager::new());
583        let manager2 = manager1.clone();
584        let manager3 = manager1.clone();
585        let crypt1 = crypt.clone();
586        let crypt2 = crypt.clone();
587
588        let task1 = fasync::Task::spawn(async move {
589            let mut buf = cipher_text(0);
590            to_result(
591                manager1
592                    .get_keys(
593                        1,
594                        crypt1.as_ref(),
595                        &mut Some(async || Ok(wrapped_keys())),
596                        false,
597                        false,
598                    )
599                    .await
600                    .expect("get_keys failed")
601                    .find_key(0),
602            )
603            .unwrap()
604            .decrypt(0, &mut buf)
605            .expect("decrypt failed");
606            assert_eq!(&buf, PLAIN_TEXT);
607        });
608        let task2 = fasync::Task::spawn(async move {
609            let mut buf = cipher_text(0);
610            to_result(
611                manager2
612                    .get_keys(
613                        1,
614                        crypt2.as_ref(),
615                        &mut Some(async || Ok(wrapped_keys())),
616                        false,
617                        false,
618                    )
619                    .await
620                    .expect("get_keys failed")
621                    .find_key(0),
622            )
623            .unwrap()
624            .decrypt(0, &mut buf)
625            .expect("decrypt failed");
626            assert_eq!(&buf, PLAIN_TEXT);
627        });
628        let task3 = fasync::Task::spawn(async move {
629            // Make sure this starts after the get_keys.
630            fasync::Timer::new(zx::MonotonicDuration::from_millis(500)).await;
631            let mut buf = cipher_text(0);
632            manager3
633                .get(1)
634                .await
635                .expect("get failed")
636                .expect("missing key")
637                .decrypt(0, &mut buf)
638                .expect("decrypt failed");
639            assert_eq!(&buf, PLAIN_TEXT);
640        });
641
642        TestExecutor::advance_to(MonotonicInstant::after(zx::MonotonicDuration::from_millis(1500)))
643            .await;
644
645        task1.await;
646        task2.await;
647        task3.await;
648    }
649
650    #[fuchsia::test]
651    async fn test_insert_and_remove() {
652        let manager = Arc::new(KeyManager::new());
653
654        manager.insert(1, &vec![(0, Some(unwrapped_key(0)))], false);
655        let mut buf = cipher_text(0);
656        manager
657            .get(1)
658            .await
659            .expect("get failed")
660            .expect("missing key")
661            .decrypt(0, &mut buf)
662            .expect("decrypt failed");
663        assert_eq!(&buf, PLAIN_TEXT);
664        let _ = manager.remove(1);
665        assert!(manager.get(1).await.expect("get failed").is_none());
666    }
667
668    #[fuchsia::test(allow_stalls = false)]
669    async fn test_purge() {
670        TestExecutor::advance_to(MonotonicInstant::from_nanos(0)).await;
671
672        let manager = Arc::new(KeyManager::new());
673        manager.insert(1, &vec![(0, Some(unwrapped_key(0)))], false);
674
675        TestExecutor::advance_to(MonotonicInstant::after(PURGE_TIMEOUT.into())).await;
676
677        // After 1 period, the key should still be present.
678        manager.get(1).await.expect("get failed").expect("missing key");
679
680        TestExecutor::advance_to(MonotonicInstant::after(PURGE_TIMEOUT.into())).await;
681
682        // The last access should have reset the timer and it should still be present.
683        manager.get(1).await.expect("get failed").expect("missing key");
684
685        TestExecutor::advance_to(MonotonicInstant::after((2 * PURGE_TIMEOUT).into())).await;
686
687        // The key should have been evicted since two periods passed.
688        assert!(manager.get(1).await.expect("get failed").is_none());
689    }
690
691    #[fuchsia::test(allow_stalls = false)]
692    async fn test_permanent() {
693        TestExecutor::advance_to(MonotonicInstant::from_nanos(0)).await;
694
695        let manager = Arc::new(KeyManager::new());
696        manager.insert(1, &vec![(0, Some(unwrapped_key(0)))], true);
697        manager.insert(2, &vec![(0, Some(unwrapped_key(0)))], false);
698
699        // Skip forward two periods which should cause 2 to be purged but not 1.
700        TestExecutor::advance_to(MonotonicInstant::after((2 * PURGE_TIMEOUT).into())).await;
701
702        assert!(manager.get(1).await.expect("get failed").is_some());
703        assert!(manager.get(2).await.expect("get failed").is_none());
704    }
705
706    #[fuchsia::test(allow_stalls = false)]
707    async fn test_clear() {
708        TestExecutor::advance_to(MonotonicInstant::from_nanos(0)).await;
709
710        let manager = Arc::new(KeyManager::new());
711        manager.insert(1, &vec![(0, Some(unwrapped_key(0)))], true);
712        manager.insert(2, &vec![(0, Some(unwrapped_key(0)))], false);
713        manager.insert(3, &vec![(0, Some(unwrapped_key(0)))], false);
714
715        // Skip forward 1 period which should make keys 2 and 3 pending deletion.
716        TestExecutor::advance_to(MonotonicInstant::after(PURGE_TIMEOUT.into())).await;
717
718        // Touch the the second key which should promote it to the active list.
719        assert!(manager.get(2).await.expect("get failed").is_some());
720
721        manager.clear();
722
723        // Clearing should have removed all three keys.
724        assert!(manager.get(1).await.expect("get failed").is_none());
725        assert!(manager.get(2).await.expect("get failed").is_none());
726        assert!(manager.get(3).await.expect("get failed").is_none());
727    }
728
729    #[fuchsia::test(allow_stalls = false)]
730    async fn test_error() {
731        TestExecutor::advance_to(MonotonicInstant::from_nanos(0)).await;
732
733        let crypt = TestCrypt::new(ERROR_COUNTER);
734        let manager1 = Arc::new(KeyManager::new());
735        let manager2 = manager1.clone();
736        let crypt1 = crypt.clone();
737        let crypt2 = crypt.clone();
738
739        let task1 = fasync::Task::spawn(async move {
740            assert!(manager1
741                .get_keys(1, crypt1.as_ref(), &mut Some(async || Ok(wrapped_keys())), false, false,)
742                .await
743                .is_err());
744        });
745        let task2 = fasync::Task::spawn(async move {
746            assert!(manager2
747                .get_keys(1, crypt2.as_ref(), &mut Some(async || Ok(wrapped_keys())), false, false,)
748                .await
749                .is_err());
750        });
751
752        TestExecutor::advance_to(MonotonicInstant::after(zx::MonotonicDuration::from_seconds(1)))
753            .await;
754
755        task1.await;
756        task2.await;
757    }
758
759    #[fuchsia::test(allow_stalls = false)]
760    async fn test_wait_after_remove() {
761        let manager = Arc::new(KeyManager::new());
762        let crypt = TestCrypt::with_unwrap_delay(0, std::time::Duration::ZERO);
763        let (sender, receiver) = oneshot::channel();
764        let dropped = AtomicBool::new(false);
765
766        assert!(join!(
767            async {
768                let mut unwrap_keys = Some(async || {
769                    struct OnDrop<'a>(&'a AtomicBool);
770                    impl Drop for OnDrop<'_> {
771                        fn drop(&mut self) {
772                            self.0.store(true, Ordering::Relaxed);
773                        }
774                    }
775                    let _on_drop = OnDrop(&dropped);
776                    sender.send(()).unwrap();
777                    // This should wait until both the remove calls below are waiting.
778                    let _ = TestExecutor::poll_until_stalled(pending::<()>()).await;
779                    Ok(wrapped_keys())
780                });
781                manager.get_keys(1, crypt.as_ref(), &mut unwrap_keys, false, false).await
782            },
783            async {
784                let _ = receiver.await;
785                join!(
786                    async {
787                        manager.remove(1).await;
788                        assert!(dropped.load(Ordering::Relaxed));
789                    },
790                    async {
791                        manager.remove(1).await;
792                        assert!(dropped.load(Ordering::Relaxed));
793                    }
794                );
795            },
796        )
797        .0
798        .is_err());
799    }
800}