use anyhow::{format_err, Error};
use fuchsia_sync::Mutex;
use futures::channel::oneshot;
use futures::future::FusedFuture;
use futures::task::{Context, Poll};
use futures::{ready, Future, FutureExt};
use slab::Slab;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
type BoxRevokeFn = Box<dyn FnOnce() -> Permit + Send>;
struct RevokeFnHolder {
f: Mutex<Option<BoxRevokeFn>>,
label: Mutex<String>,
}
impl RevokeFnHolder {
fn new(f: Option<BoxRevokeFn>) -> Arc<Self> {
Arc::new(Self { f: Mutex::new(f), label: Mutex::new(String::default()) })
}
fn replace(&self, f: BoxRevokeFn) -> Option<BoxRevokeFn> {
self.f.lock().replace(f)
}
fn relabel(&self, label: String) {
*(self.label.lock()) = label;
}
fn label(&self) -> String {
self.label.lock().clone()
}
fn take(&self) -> Option<BoxRevokeFn> {
self.f.lock().take()
}
fn is_revokable(&self) -> bool {
self.f.lock().is_some()
}
fn extract(weak: &Weak<Self>) -> BoxRevokeFn {
weak.upgrade().expect("should be resolvable").take().expect("revokable fn missing")
}
}
impl std::fmt::Debug for RevokeFnHolder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RevokeFnHolder")
.field("revokable", &self.is_revokable())
.field("label", &self.label())
.finish()
}
}
struct WaitingReservation {
sender: futures::channel::oneshot::Sender<Permit>,
}
struct PermitsInner {
limit: usize,
out: Slab<Weak<RevokeFnHolder>>,
waiting: VecDeque<WaitingReservation>,
revocations: VecDeque<usize>,
weak: Weak<Mutex<Self>>,
}
impl std::fmt::Debug for PermitsInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut debug = f.debug_struct("PermitsInner");
let _ = debug.field("limit", &self.limit).field("waiting", &self.waiting.len());
for (k, holder) in &self.out {
let h = holder.upgrade().unwrap();
let holder_str = format!("{}: {}", if h.is_revokable() { "R" } else { "I" }, h.label());
let _ = debug.field(format!("permit{k}").as_str(), &holder_str);
}
debug.finish()
}
}
impl PermitsInner {
fn new(limit: usize) -> Arc<Mutex<Self>> {
Arc::new_cyclic(|weak| {
Mutex::new(Self {
limit,
out: Slab::with_capacity(limit),
waiting: VecDeque::new(),
revocations: VecDeque::new(),
weak: weak.clone(),
})
})
}
fn try_get(&mut self, revoke_fn: Option<BoxRevokeFn>) -> Result<Permit, Error> {
if self.out.len() == self.out.capacity() {
return Err(format_err!("No permits left"));
}
let is_revokable = revoke_fn.is_some();
let fn_holder = RevokeFnHolder::new(revoke_fn);
let key = self.out.insert(Arc::downgrade(&fn_holder));
if is_revokable {
self.revocations.push_back(key);
}
Ok(Permit {
inner: Some(self.weak.upgrade().unwrap()),
committed: Arc::new(AtomicBool::new(true)),
fn_holder,
key,
})
}
fn release(&mut self, key: usize) {
self.revocations.retain(|k| *k != key);
let holder = self.out.get(key).expect("reservation present").upgrade().unwrap();
drop(holder.take());
let this = self.weak.upgrade().unwrap();
while let Some(sender) = self.waiting.pop_front() {
if let Ok(()) = Permit::handoff(sender, this.clone(), holder.clone(), key) {
return;
}
}
drop(self.out.remove(key));
}
fn reservation(&mut self, revoke_fn: Option<BoxRevokeFn>) -> Reservation {
let (sender, receiver) = oneshot::channel();
match self.try_get(None).ok() {
Some(permit) => sender.send(permit).ok().unwrap(),
None => self.waiting.push_back(WaitingReservation { sender }),
}
Reservation { receiver, revoke_fn, inner: self.weak.clone() }
}
fn make_revokable(&mut self, key: usize, revoke_fn: BoxRevokeFn) {
let prev = self
.out
.get(key)
.expect("reservation should be out")
.upgrade()
.expect("holder should resolve")
.replace(revoke_fn);
assert!(prev.is_none(), "shouldn't be replacing a previous revocation function");
self.revocations.push_back(key);
}
fn pop_revoke(&mut self) -> Option<BoxRevokeFn> {
self.revocations.pop_front().map(|idx| RevokeFnHolder::extract(&self.out[idx]))
}
fn revoke_all(&mut self) -> Vec<BoxRevokeFn> {
let mut indices = std::mem::take(&mut self.revocations);
indices.drain(..).map(|idx| RevokeFnHolder::extract(&self.out[idx])).collect()
}
}
pub struct Reservation {
receiver: oneshot::Receiver<Permit>,
revoke_fn: Option<BoxRevokeFn>,
inner: Weak<Mutex<PermitsInner>>,
}
impl Future for Reservation {
type Output = Permit;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = ready!(self.receiver.poll_unpin(cx));
let permit = res.expect("sender shouldn't be dropped, polled after termination?");
if let (Some(f), Some(inner)) = (self.revoke_fn.take(), self.inner.upgrade()) {
inner.lock().make_revokable(permit.key, f);
}
Poll::Ready(permit)
}
}
impl FusedFuture for Reservation {
fn is_terminated(&self) -> bool {
self.receiver.is_terminated()
}
}
#[derive(Debug, Clone)]
pub struct Permits {
inner: Arc<Mutex<PermitsInner>>,
limit: usize,
}
impl Permits {
pub fn new(limit: usize) -> Self {
Self { inner: PermitsInner::new(limit), limit }
}
pub fn limit(&self) -> usize {
self.limit
}
pub fn get(&self) -> Option<Permit> {
Permit::try_issue(self.inner.clone(), None)
}
pub fn get_revokable(
&self,
revoked_fn: impl FnOnce() -> Permit + 'static + Send,
) -> Option<Permit> {
Permit::try_issue(self.inner.clone(), Some(Box::new(revoked_fn)))
}
pub fn take(&self) -> Option<Permit> {
if let Some(permit) = self.get() {
return Some(permit);
}
let revoke_fn = self.inner.lock().pop_revoke();
revoke_fn.map(|f| f())
}
pub fn seize(&self) -> Vec<Permit> {
let mut bunch = Vec::new();
let mut revoke_fns = {
let mut lock = self.inner.lock();
loop {
match lock.try_get(None).ok() {
Some(permit) => bunch.push(permit),
None => break,
}
}
lock.revoke_all()
};
for f in revoke_fns.drain(..) {
bunch.push(f())
}
bunch
}
pub fn reserve(&self) -> Reservation {
self.inner.lock().reservation(None)
}
pub fn reserve_revokable(
&self,
revoked_fn: impl FnOnce() -> Permit + 'static + Send,
) -> Reservation {
self.inner.lock().reservation(Some(Box::new(revoked_fn)))
}
}
#[derive(Debug)]
pub struct Permit {
inner: Option<Arc<Mutex<PermitsInner>>>,
committed: Arc<AtomicBool>,
fn_holder: Arc<RevokeFnHolder>,
key: usize,
}
impl Permit {
pub fn relabel(&self, new_label: String) {
self.fn_holder.relabel(new_label);
}
fn try_issue(inner: Arc<Mutex<PermitsInner>>, revoke_fn: Option<BoxRevokeFn>) -> Option<Self> {
inner.lock().try_get(revoke_fn).ok()
}
fn handoff(
waiting: WaitingReservation,
inner: Arc<Mutex<PermitsInner>>,
fn_holder: Arc<RevokeFnHolder>,
key: usize,
) -> Result<(), Error> {
let committed = Arc::new(AtomicBool::new(false));
let commit_clone = committed.clone();
let potential = Self { inner: Some(inner), committed, key, fn_holder };
match waiting.sender.send(potential) {
Ok(()) => {
commit_clone.store(true, Ordering::Relaxed);
Ok(())
}
Err(_) => Err(format_err!("failed to handoff")),
}
}
}
impl Drop for Permit {
fn drop(&mut self) {
let inner = match self.inner.take() {
None => return, Some(inner) => inner,
};
let committed = self.committed.load(Ordering::Relaxed);
if committed {
inner.lock().release(self.key);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_utils::PollExt;
use fuchsia_async as fasync;
#[track_caller]
fn expect_none<T>(opt: Option<T>, msg: &str) {
if let Some(_) = opt {
panic!("{}", msg);
}
}
#[track_caller]
fn expect_no_permits(exec: &mut fasync::TestExecutor, reservation: &mut Reservation) {
exec.run_until_stalled(reservation)
.expect_pending("expected reservation to have no permits");
}
#[track_caller]
fn expect_permit_available(
exec: &mut fasync::TestExecutor,
reservation: &mut Reservation,
) -> Permit {
exec.run_until_stalled(reservation).expect("reservation to have available permit")
}
#[test]
fn no_permits_available() {
let permits = Permits::new(0);
expect_none(permits.get(), "shouldn't be able to get a permit");
let _reservation = permits.reserve();
}
#[test]
fn permit_dropping() {
let permits = Permits::new(2);
assert_eq!(2, permits.limit());
let one = permits.get().expect("first permit");
let two = permits.get().expect("second permit");
expect_none(permits.get(), "shouln't get a third permit");
drop(two);
let three = permits.get().expect("third permit");
drop(one);
let four = permits.get().expect("fourth permit");
drop(three);
drop(four);
let _five = permits.get().expect("fifth permit");
}
#[test]
fn permit_reservations() {
let mut exec = fasync::TestExecutor::new();
let permits = Permits::new(2);
let one = permits.get().expect("permit one should be available");
let two = permits.get().expect("second permit is also okay");
expect_none(permits.get(), "can't get a third item");
let mut first = permits.reserve();
let second = permits.reserve();
let mut third = permits.reserve();
let mut fourth = permits.reserve();
drop(second);
expect_no_permits(&mut exec, &mut first);
expect_no_permits(&mut exec, &mut third);
expect_no_permits(&mut exec, &mut fourth);
drop(one);
let first_out = expect_permit_available(&mut exec, &mut first);
expect_no_permits(&mut exec, &mut third);
expect_no_permits(&mut exec, &mut fourth);
drop(first_out);
let third_out = expect_permit_available(&mut exec, &mut third);
expect_no_permits(&mut exec, &mut fourth);
drop(fourth);
drop(two);
let mut fifth = permits.reserve();
expect_none(permits.get(), "no items should be available");
let sixth = permits.reserve();
let mut seventh = permits.reserve();
let _eighth = permits.reserve();
drop(permits);
let _fifth_out = expect_permit_available(&mut exec, &mut fifth);
drop(third_out);
drop(sixth);
let _seventh_out = expect_permit_available(&mut exec, &mut seventh);
}
#[test]
fn revoke_permits() {
const TOTAL_PERMITS: usize = 2;
let permits = Permits::new(TOTAL_PERMITS);
let permit_holder = Arc::new(Mutex::new(None));
let revoke_from_holder_fn = {
let holder = permit_holder.clone();
move || holder.lock().take().expect("should be holding Permit")
};
let revokable_permit =
permits.get_revokable(revoke_from_holder_fn.clone()).expect("permit available");
*permit_holder.lock() = Some(revokable_permit);
let seized_permits = permits.seize();
assert_eq!(TOTAL_PERMITS, seized_permits.len());
assert!(permit_holder.lock().is_none());
drop(seized_permits);
let _nonrevokable_permit = permits.take().expect("permit available");
let revokable_permit =
permits.get_revokable(revoke_from_holder_fn.clone()).expect("two permits");
*permit_holder.lock() = Some(revokable_permit);
let seized_permits = permits.seize();
assert_eq!(1, seized_permits.len());
assert!(permit_holder.lock().is_none());
drop(seized_permits);
let revokable_permit = permits.get_revokable(revoke_from_holder_fn).expect("permit");
*permit_holder.lock() = Some(revokable_permit);
let _taken = permits.take().expect("can take the permit");
assert!(permit_holder.lock().is_none());
assert!(permits.take().is_none());
}
#[test]
fn revokable_dropped_before_revokation() {
const TOTAL_PERMITS: usize = 2;
let permits = Permits::new(TOTAL_PERMITS);
let permit_holder = Arc::new(Mutex::new(None));
let revoke_from_holder_fn = {
let holder = permit_holder.clone();
move || holder.lock().take().expect("should be holding Permit when revoked")
};
let revokable_permit =
permits.get_revokable(revoke_from_holder_fn).expect("permit available");
drop(revokable_permit);
let seized_permits = permits.seize();
assert_eq!(TOTAL_PERMITS, seized_permits.len());
}
fn revoke_then_reserve_again(
permits: Permits,
holder: Arc<Mutex<Vec<Permit>>>,
reservations: Arc<Mutex<Vec<Reservation>>>,
) -> Permit {
let permit = holder.lock().pop().expect("should have a permit");
let recurse_fn = {
let permits = permits.clone();
let reservations = reservations.clone();
move || revoke_then_reserve_again(permits, holder, reservations)
};
let reservation = permits.reserve_revokable(recurse_fn);
reservations.lock().push(reservation);
permit
}
#[fuchsia::test]
fn revokable_reservations() {
let mut exec = fasync::TestExecutor::new();
const TOTAL_PERMITS: usize = 2;
let permits = Permits::new(TOTAL_PERMITS);
let permits_holder = Arc::new(Mutex::new(Vec::new()));
let reservations_holder = Arc::new(Mutex::new(Vec::new()));
let revoke_from_holder_fn = {
let holder = permits_holder.clone();
move || holder.lock().pop().expect("should have a Permit")
};
let revokable =
permits.get_revokable(revoke_from_holder_fn.clone()).expect("got revokable");
permits_holder.lock().push(revokable);
let revoke_then_reserve_fn = {
let permits = permits.clone();
let holder = permits_holder.clone();
let reservations = reservations_holder.clone();
move || revoke_then_reserve_again(permits, holder, reservations)
};
let mut revokable_reservation = permits.reserve_revokable(revoke_then_reserve_fn);
let revokable_permit = expect_permit_available(&mut exec, &mut revokable_reservation);
permits_holder.lock().push(revokable_permit);
let seized_permits = permits.seize();
assert_eq!(TOTAL_PERMITS, seized_permits.len());
assert_eq!(0, permits_holder.lock().len());
let mut another_reservation = reservations_holder.lock().pop().expect("reservation");
let mut revokable_reservation_two = permits.reserve_revokable(revoke_from_holder_fn);
drop(seized_permits);
let revokable_permit = expect_permit_available(&mut exec, &mut another_reservation);
permits_holder.lock().push(revokable_permit);
let revokable_permit = expect_permit_available(&mut exec, &mut revokable_reservation_two);
permits_holder.lock().push(revokable_permit);
let seized_permits = permits.seize();
assert_eq!(TOTAL_PERMITS, seized_permits.len());
assert_eq!(0, permits_holder.lock().len());
let mut yet_another = reservations_holder.lock().pop().expect("reservation");
expect_no_permits(&mut exec, &mut yet_another);
drop(seized_permits);
let one = permits.get().expect("one is available");
expect_none(permits.get(), "none should be available");
let revokable_permit = expect_permit_available(&mut exec, &mut yet_another);
permits_holder.lock().push(revokable_permit);
let seized_permits = permits.seize();
assert_eq!(1, seized_permits.len());
assert_eq!(0, permits_holder.lock().len());
let mut yet_another = reservations_holder.lock().pop().expect("reservation");
expect_no_permits(&mut exec, &mut yet_another);
drop(one);
let revokable_permit = expect_permit_available(&mut exec, &mut yet_another);
permits_holder.lock().push(revokable_permit);
let taken_permit = permits.take().expect("should be able to take one");
drop(taken_permit);
permits_holder.lock().clear();
drop(permits_holder);
reservations_holder.lock().clear();
drop(reservations_holder);
drop(permits);
}
}