1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45use crate::message::action_fuse::{ActionFuse, ActionFuseHandle};
6use crate::message::base::{Message, MessageClientId, MessageEvent, MessengerId, Status};
7use crate::message::message_client::MessageClient;
8use crate::message::messenger::Messenger;
9use crate::message::receptor::Receptor;
10use anyhow::{format_err, Error};
11use fuchsia_async::{self as fasync, DurationExt};
12use futures::channel::mpsc::UnboundedSender;
13use futures::future::{AbortHandle, Abortable, TryFutureExt};
14use futures::lock::Mutex;
15use std::rc::Rc;
16use zx::MonotonicDuration;
1718/// Helper for creating a beacon. The builder allows chaining additional fuses
19pub struct BeaconBuilder {
20 messenger: Messenger,
21 chained_fuses: Option<ActionFuseHandle>,
22 timeout: Option<MonotonicDuration>,
23}
2425impl BeaconBuilder {
26pub(super) fn new(messenger: Messenger) -> Self {
27Self { messenger, chained_fuses: None, timeout: None }
28 }
2930pub(super) fn add_fuse(mut self, fuse: ActionFuseHandle) -> Self {
31self.chained_fuses = Some(fuse);
32self
33}
3435pub(super) fn set_timeout(mut self, duration: Option<MonotonicDuration>) -> Self {
36self.timeout = duration;
37self
38}
3940pub(super) fn build(self) -> (Beacon, Receptor) {
41 Beacon::create(self.messenger, self.chained_fuses, self.timeout)
42 }
43}
4445/// A Beacon is the conduit for sending messages to a particular Receptor. An
46/// instance may be cloned and passed around to other components. All copies of
47/// a particular Beacon share a reference to an flag that signals whether the
48/// Receptor is active, which controls whether future messages will be sent.
49///
50/// It is important to note that Beacons spawn from sending a Message. Status
51/// and other context sent through the Beacon are in relation to this original
52/// Message (either an origin or reply).
53#[derive(Clone, Debug)]
54pub struct Beacon {
55/// A reference to the associated Messenger. This is only used when delivering
56 /// a new message to a beacon, where a MessageClient (which references both
57 /// the recipient's Messenger and the message) must be created.
58messenger: Messenger,
59/// The sender half of an internal channel established between the Beacon and
60 /// Receptor.
61event_sender: UnboundedSender<MessageEvent>,
62/// Sentinel for secondary ActionFuses
63sentinel: Rc<Mutex<Sentinel>>,
64/// Timeout for firing if a response payload is not delivered in time.
65timeout_abort_client: AbortHandle,
66}
6768impl Beacon {
69/// Creates a Beacon, Receptor tuple. The Messenger provided as an argument
70 /// will be associated with any delivered Message for reply purposes.
71fn create(
72 messenger: Messenger,
73 fuses: Option<ActionFuseHandle>,
74 timeout: Option<MonotonicDuration>,
75 ) -> (Beacon, Receptor) {
76let sentinel = Rc::new(Mutex::new(Sentinel::new()));
77let (event_tx, event_rx) = futures::channel::mpsc::unbounded::<MessageEvent>();
78let (timeout_abort_client, timeout_abort_server) = AbortHandle::new_pair();
79let signature = messenger.get_signature();
80let beacon = Beacon {
81 messenger,
82 event_sender: event_tx.clone(),
83 sentinel: sentinel.clone(),
84 timeout_abort_client: timeout_abort_client.clone(),
85 };
8687// pass fuse to receptor to hold and set when it goes out of scope.
88let receptor = Receptor::new(
89 signature,
90 event_rx,
91 ActionFuse::create(Box::new(move || {
92let sentinel = sentinel.clone();
93 fasync::Task::local(async move {
94 timeout_abort_client.abort();
95 sentinel.lock().await.trigger().await;
96 })
97 .detach();
98 })),
99 fuses,
100 );
101102if let Some(duration) = timeout {
103let abortable_timeout = Abortable::new(
104async move {
105 fuchsia_async::Timer::new(duration.after_now()).await;
106// Panic if send failed, otherwise the client cannot abort processes.
107event_tx
108 .unbounded_send(MessageEvent::Status(Status::Timeout))
109 .expect("Beacon::create, event_tx failed to send Timeout status message");
110 },
111 timeout_abort_server,
112 );
113114 fasync::Task::local(abortable_timeout.unwrap_or_else(|_| ())).detach();
115 }
116 (beacon, receptor)
117 }
118119/// Sends the Status associated with the original message that spawned
120 /// this beacon.
121pub(super) async fn status(&self, status: Status) -> Result<(), Error> {
122if self.event_sender.unbounded_send(MessageEvent::Status(status)).is_err() {
123return Err(format_err!("failed to deliver status"));
124 }
125126Ok(())
127 }
128129/// Delivers a response to the original message that spawned this Beacon.
130pub(super) async fn deliver(
131&self,
132 message: Message,
133 client_id: MessageClientId,
134 ) -> Result<(), Error> {
135self.timeout_abort_client.abort();
136if self
137.event_sender
138 .unbounded_send(MessageEvent::Message(
139 message.payload().clone(),
140 MessageClient::new(client_id, message, self.messenger.clone()),
141 ))
142 .is_err()
143 {
144return Err(format_err!("failed to deliver message"));
145 }
146147Ok(())
148 }
149150/// Adds the specified fuse to the beacon's sentinel.
151pub(super) async fn add_fuse(&mut self, fuse: ActionFuseHandle) {
152self.sentinel.lock().await.add_fuse(fuse);
153 }
154155/// Returns the identifier for the associated Messenger.
156pub(super) fn get_messenger_id(&self) -> MessengerId {
157self.messenger.get_id()
158 }
159}
160161/// Sentinel gathers actions fuses from other sources and releases them
162/// on-demand.
163struct Sentinel {
164 active: bool,
165 fuses: Vec<ActionFuseHandle>,
166}
167168impl Sentinel {
169/// Generates a new Sentinel.
170fn new() -> Self {
171Self { active: true, fuses: vec![] }
172 }
173174/// Adds a fuse if still active.
175fn add_fuse(&mut self, fuse: ActionFuseHandle) {
176// In the case we're not active anymore, do not add fuse.
177if !self.active {
178return;
179 }
180181self.fuses.push(fuse);
182 }
183184/// Removes all pending fuses.
185async fn trigger(&mut self) {
186self.active = false;
187// Clear fuses, triggering them.
188self.fuses.clear();
189 }
190}