settings/message/
messenger.rsuse crate::message::action_fuse::ActionFuseHandle;
use crate::message::base::{
ActionSender, Attribution, Audience, Fingerprint, Message, MessageAction, MessageType,
MessengerId, Signature,
};
use crate::message::beacon::{Beacon, BeaconBuilder};
use crate::message::receptor::Receptor;
use zx::MonotonicDuration;
#[derive(Clone, Debug)]
pub struct MessengerClient {
messenger: Messenger,
_fuse: ActionFuseHandle, }
impl MessengerClient {
pub(super) fn new(messenger: Messenger, fuse: ActionFuseHandle) -> MessengerClient {
MessengerClient { messenger, _fuse: fuse }
}
pub(crate) fn message(&self, payload: crate::Payload, audience: Audience) -> Receptor {
self.message_with_timeout(payload, audience, None)
}
pub(crate) fn message_with_timeout(
&self,
payload: crate::Payload,
audience: Audience,
duration: Option<MonotonicDuration>,
) -> Receptor {
let (beacon, receptor) =
BeaconBuilder::new(self.messenger.clone()).set_timeout(duration).build();
self.messenger.transmit(
MessageAction::Send(payload, Attribution::Source(MessageType::Origin(audience))),
Some(beacon),
);
receptor
}
pub fn get_signature(&self) -> Signature {
self.messenger.get_signature()
}
}
#[derive(Clone, Debug)]
pub struct Messenger {
fingerprint: Fingerprint,
action_tx: ActionSender,
}
impl Messenger {
pub(super) fn new(fingerprint: Fingerprint, action_tx: ActionSender) -> Messenger {
Messenger { fingerprint, action_tx }
}
pub(super) fn get_id(&self) -> MessengerId {
self.fingerprint.id
}
pub(super) fn forward(&self, message: Message, beacon: Option<Beacon>) {
self.transmit(MessageAction::Forward(message), beacon);
}
pub(super) fn transmit(&self, action: MessageAction, beacon: Option<Beacon>) {
if self.action_tx.is_closed() {
return;
}
self.action_tx.unbounded_send((self.fingerprint, action, beacon)).unwrap_or_else(|_| {
tracing::warn!("Messenger::transmit, action_tx failed to send message")
});
}
pub(super) fn get_signature(&self) -> Signature {
self.fingerprint.signature
}
}