settings/message/base.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use crate::message::action_fuse::ActionFuseHandle;
use crate::message::beacon::Beacon;
use crate::message::message_client::MessageClient;
use crate::message::messenger::MessengerClient;
use crate::message::receptor::Receptor;
use futures::channel::mpsc::UnboundedSender;
use futures::channel::oneshot::Sender;
use std::fmt::{Debug, Formatter};
use std::hash::Hash;
use std::rc::Rc;
use thiserror::Error;
/// Trait alias for types of data that can be used as the payload in a
/// MessageHub.
pub trait Payload: Clone + Debug {}
impl<T: Clone + Debug> Payload for T {}
/// Trait alias for types of data that can be used as an address in a
/// MessageHub.
pub trait Address: Copy + Debug + Eq + Hash + Unpin {}
impl<T: Copy + Debug + Eq + Hash + Unpin> Address for T {}
/// `Filter` is used by the `MessageHub` to determine whether an incoming
/// message should be directed to associated broker.
pub type Filter = Rc<dyn Fn(&Message) -> bool>;
/// A mod for housing common definitions for messengers. Messengers are
/// MessageHub participants, which are capable of sending and receiving
/// messages.
pub(super) mod messenger {
use super::MessengerType;
/// `Descriptor` is a blueprint for creating a messenger. It is sent to the
/// MessageHub by clients, which interprets the information to build the
/// messenger.
#[derive(Clone)]
pub struct Descriptor {
/// The type of messenger to be created. This determines how messages
/// can be directed to a messenger created from this Descriptor.
/// Please reference [Audience](crate::message::base::Audience) to see how these types map
/// to audience targets.
pub messenger_type: MessengerType,
}
}
/// A MessageEvent defines the data that can be returned through a message
/// receptor.
#[derive(Debug, PartialEq)]
pub enum MessageEvent {
/// A message that has been delivered, either as a new message directed at to
/// the recipient's address or a reply to a previously sent message
/// (dependent on the receptor's context).
Message(crate::Payload, MessageClient),
/// A status update for the message that spawned the receptor delivering this
/// update.
Status(Status),
}
#[derive(Error, Debug, Clone)]
pub enum MessageError {
#[error("Address conflig:{address:?} already exists")]
AddressConflict { address: crate::Address },
#[error("Unexpected Error")]
Unexpected,
}
/// The types of results possible from sending or replying.
#[derive(Copy, Clone, PartialEq, Debug)]
pub enum Status {
// Sent to some audience, potentially no one.
Broadcasted,
// Received by the intended address.
Received,
// Could not be delivered to the specified target.
Undeliverable,
// Acknowledged by a recipient.
Acknowledged,
Timeout,
}
/// The intended recipients for a message.
#[derive(Clone, Debug, PartialEq, Hash, Eq)]
pub enum Audience {
// All non-broker messengers outside of the sender.
Broadcast,
// The messenger at the specified address.
Address(crate::Address),
// The messenger with the specified signature.
Messenger(Signature),
// A messenger that is an event sink.
EventSink,
}
/// An identifier that can be used to send messages directly to a Messenger.
/// Included with Message instances.
#[derive(Copy, Clone, Debug, PartialEq, Hash, Eq)]
pub enum Signature {
// Messenger at a given address.
Address(crate::Address),
// The messenger cannot be directly addressed.
Anonymous(MessengerId),
}
#[derive(Copy, Clone, Debug)]
pub struct Fingerprint {
pub id: MessengerId,
pub signature: Signature,
}
/// The messengers that can participate in messaging
#[derive(Clone)]
pub enum MessengerType {
/// An endpoint in the messenger graph. Can have messages specifically
/// addressed to it and can author new messages.
Addressable(crate::Address),
/// A intermediary messenger. Will receive every forwarded message. Brokers
/// are able to send and reply to messages, but the main purpose is to observe
/// messages. An optional filter may be specified, which limits the messages
/// directed to this broker.
Broker(Filter),
/// Used in tests to observe broadcast events.
#[cfg(test)]
EventSink,
/// A messenger that cannot be reached by an address.
Unbound,
}
/// We must implement Debug since the `Filter` does not provide
/// a `Debug` implementation.
impl Debug for MessengerType {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
match self {
MessengerType::Addressable(addr) => write!(f, "Addressable({addr:?})"),
MessengerType::Broker(_) => write!(f, "Broker"),
#[cfg(test)]
MessengerType::EventSink => write!(f, "EventSink"),
MessengerType::Unbound => write!(f, "Unbound"),
}
}
}
/// MessageType captures details about the Message's source.
#[derive(Clone, Debug)]
pub enum MessageType {
/// A completely new message that is intended for the specified audience.
Origin(Audience),
/// A response to a previously received message. Note that the value must
/// be boxed to mitigate recursive sizing issues as MessageType is held by
/// Message.
Reply(Box<Message>),
}
/// `Attribution` describes the relationship of the message path in relation
/// to the author.
#[derive(Clone, Debug)]
pub enum Attribution {
/// `Source` attributed messages are the original messages to be sent on a
/// path. For example, a source attribution for an origin message type will
/// be authored by the original sender. In a reply message type, a source
/// attribution means the reply was authored by the original message's
/// intended target.
Source(MessageType),
/// `Derived` attributed messages are messages that have been modified by
/// someone in the message path. They follow the same trajectory (audience
/// or return path), but their message has been altered. The supplied
/// signature is the messenger that modified the specified message.
Derived(Box<Message>, Signature),
}
/// The core messaging unit. A Message may be annotated by messengers, but is
/// not associated with a particular Messenger instance.
#[derive(Clone, Debug)]
pub struct Message {
author: Fingerprint,
payload: crate::Payload,
attribution: Attribution,
// The return path is generated while the message is passed from messenger
// to messenger on the way to the intended recipient. It indicates the
// messengers that would like to be informed of replies to this message.
// The message author is always the last element in this vector. New
// participants are pushed to the front.
return_path: Vec<Beacon>,
}
impl Message {
/// Returns a new Message instance. Only the MessageHub can mint new messages.
pub(super) fn new(
author: Fingerprint,
payload: crate::Payload,
attribution: Attribution,
) -> Message {
let mut return_path = vec![];
// A derived message adopts the return path of the original message.
if let Attribution::Derived(message, _) = &attribution {
return_path.extend(message.get_return_path().iter().cloned());
}
Message { author, payload, attribution, return_path }
}
/// Adds an entity to be notified on any replies.
pub(super) fn add_participant(&mut self, participant: Beacon) {
self.return_path.insert(0, participant);
}
/// Returns the Signatures of messengers who have modified this message
/// through propagation.
#[cfg(test)]
pub(super) fn get_modifiers(&self) -> Vec<Signature> {
let mut modifiers = vec![];
if let Attribution::Derived(origin, signature) = &self.attribution {
modifiers.push(*signature);
modifiers.extend(origin.get_modifiers());
}
modifiers
}
pub(crate) fn get_author(&self) -> Signature {
match &self.attribution {
Attribution::Source(_) => self.author.signature,
Attribution::Derived(message, _) => message.get_author(),
}
}
/// Binds the action fuse to the author's receptor. The fuse will fire
/// once that receptor is released.
pub(super) async fn bind_to_author(&mut self, fuse: ActionFuseHandle) {
if let Some(beacon) = self.return_path.last_mut() {
beacon.add_fuse(fuse).await;
}
}
/// Returns the list of participants for the reply return path.
pub(super) fn get_return_path(&self) -> &Vec<Beacon> {
&self.return_path
}
/// Returns the message's attribution, which identifies whether it has been modified by a source
/// other than the original author.
pub(crate) fn get_attribution(&self) -> &Attribution {
&self.attribution
}
/// Returns the message's type.
pub(crate) fn get_type(&self) -> &MessageType {
match &self.attribution {
Attribution::Source(message_type) => message_type,
Attribution::Derived(message, _) => message.get_type(),
}
}
/// Returns a reference to the message's payload.
pub(crate) fn payload(&self) -> &crate::Payload {
&self.payload
}
/// Delivers the supplied status to all participants in the return path.
pub(super) async fn report_status(&self, status: Status) {
for beacon in &self.return_path {
// It's ok if the other end is already closed and does not get an update, so we can
// safely ignore the result here.
let _ = beacon.status(status).await;
}
}
}
/// Type definition for a sender handed by the MessageHub to messengers to
/// send actions.
pub(super) type ActionSender = UnboundedSender<(Fingerprint, MessageAction, Option<Beacon>)>;
/// An internal identifier used by the MessageHub to identify messengers.
pub(super) type MessengerId = usize;
/// An internal identifier used by the `MessageHub` to identify `MessageClient`.
pub(super) type MessageClientId = usize;
pub(super) type CreateMessengerResult = Result<(MessengerClient, Receptor), MessageError>;
/// Callback for handing back a messenger
pub(super) type MessengerSender = Sender<CreateMessengerResult>;
/// Callback for checking on messenger presence
#[cfg(test)]
pub(super) type MessengerPresenceSender = Sender<MessengerPresenceResult>;
#[cfg(test)]
pub(super) type MessengerPresenceResult = Result<bool, MessageError>;
/// Type definition for a sender handed by the MessageHub to spawned components
/// (messenger factories and messengers) to control messengers.
pub(super) type MessengerActionSender = UnboundedSender<MessengerAction>;
/// Internal representation of possible actions around a messenger.
pub(super) enum MessengerAction {
/// Creates a top level messenger
Create(messenger::Descriptor, MessengerSender, MessengerActionSender),
#[cfg(test)]
/// Check whether a messenger exists for the given [`Signature`]
CheckPresence(Signature, MessengerPresenceSender),
/// Deletes a messenger by its [`Signature`]
DeleteBySignature(Signature),
}
/// Internal representation for possible actions on a message.
#[derive(Debug)]
pub(super) enum MessageAction {
// A new message sent to the specified audience.
Send(crate::Payload, Attribution),
// The message has been forwarded by the current holder.
Forward(Message),
}