cm_util/
task_group.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// Copyright 2023 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use fuchsia_async as fasync;
use futures::Future;
use std::fmt;
use std::sync::{self, Arc, Weak};

/// A simple wrapper for `TaskGroup` that stores the `TaskGroup` in an `Arc` so it can be passed
/// between threads.
#[derive(Clone)]
pub struct TaskGroup {
    task_group: Arc<sync::Mutex<Option<fasync::TaskGroup>>>,
}

impl fmt::Debug for TaskGroup {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("TaskGroup").finish()
    }
}

impl TaskGroup {
    pub fn new() -> Self {
        Self { task_group: Arc::new(sync::Mutex::new(Some(fasync::TaskGroup::new()))) }
    }

    /// Creates a new WeakTaskGroup from this group.
    pub fn as_weak(&self) -> WeakTaskGroup {
        WeakTaskGroup { task_group: Arc::downgrade(&self.task_group) }
    }

    /// Spawns a new task in this TaskGroup.
    ///
    /// If `join` has been called on a clone of this TaskGroup, `spawn` will drop the task instead.
    ///
    /// # Panics
    ///
    /// `spawn` may panic if not called in the context of an executor (e.g.
    /// within a call to `run` or `run_singlethreaded`).
    pub fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
        let mut task_group = self.task_group.lock().unwrap();
        if let Some(task_group) = task_group.as_mut() {
            task_group.spawn(future);
        }
    }

    /// Waits for all Tasks in this TaskGroup to finish. Prevents future tasks from being spawned
    /// if there's another task that holds a clone of this TaskGroup.
    pub async fn join(self) {
        let task_group = {
            let mut task_group_lock = self.task_group.lock().unwrap();
            task_group_lock.take()
        };
        if let Some(task_group) = task_group {
            task_group.join().await;
        }
    }
}

/// Holds a weak reference to the internal `TaskGroup`, and can spawn futures on it as long as the
/// reference is still valid. If a task group is to hold a future that wants to spawn other tasks
/// on the same group, this future should hold a WeakTaskGroup so that there is no reference cycle
/// between the task group and tasks on the task group.
#[derive(Debug, Clone)]
pub struct WeakTaskGroup {
    task_group: Weak<sync::Mutex<Option<fasync::TaskGroup>>>,
}

impl WeakTaskGroup {
    /// Adds a task to the group this WeakTaskGroup was created from. The task is dropped if there
    /// are no more strong references to the original task group.
    pub fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
        if let Some(task_group) = self.task_group.upgrade() {
            let temp_task_group = TaskGroup { task_group };
            temp_task_group.spawn(future);
        }
    }
}