settings/ingress/request.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
// Copyright 2021 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! Single request handling.
//!
//! The request mod defines the components necessary for executing a single, isolated command within
//! the [Job] ecosystem. The most common work type in the setting service that fall in this category
//! are set commands where the requested action can be succinctly captured in a single request in
//! the service.
//!
//! One must define two concrete trait implementations in order to use the components of this mod.
//! The first trait is the [From] trait for [Response]. While [Response] could be broken
//! down into its contained [SettingInfo](crate::base::SettingInfo) and
//! [Error](crate::handler::base::Error) types, callers often only care about the success of a call.
//! For example, set calls typically return an empty value upon success and therefore do not
//! have a value to convert. The second trait is [Responder], which takes the first trait
//! implementation as a parameter. Ths trait allows callers to customize how the response is handled
//! with their own type as defined in the [From<Response>] trait. One should note that the
//! responder itself is passed in on the callback. This allows for the consumption of any resources
//! in the one-time use callback.
use crate::base::SettingType;
use crate::handler::base::{Payload, Request, Response};
use crate::job::work::{Independent, Load};
use crate::job::Job;
use crate::message::base::Audience;
use crate::service::{message, Address};
use crate::trace;
use async_trait::async_trait;
use fuchsia_trace as ftrace;
use std::marker::PhantomData;
/// A [Responder] is passed into [Work] as a handler for responses generated by the work.
pub(crate) trait Responder<R: From<Response>> {
/// Invoked when a response to the request is ready.
fn respond(self, response: R);
}
/// [Work] executes a single request and passes the results back to a specified responder. Consumers
/// of [Work] specify a [SettingType] along with [Request] for proper routing.
pub(crate) struct Work<R, T>
where
R: From<Response>,
T: Responder<R>,
{
request: Request,
setting_type: SettingType,
responder: T,
_data: PhantomData<R>,
}
impl<R: From<Response>, T: Responder<R>> Work<R, T> {
pub(crate) fn new(setting_type: SettingType, request: Request, responder: T) -> Self {
Self { setting_type, request, responder, _data: PhantomData }
}
}
/// [Work] implements the [Independent] trait as each request execution should be done in isolation
/// and executes in the order it was received, not waiting on any existing [Job] of the same group
/// to be executed (as is the case for [crate::job::work::Sequential]).
#[async_trait(?Send)]
impl<R, T> Independent for Work<R, T>
where
R: From<Response>,
T: Responder<R>,
{
async fn execute(self: Box<Self>, messenger: message::Messenger, id: ftrace::Id) {
trace!(id, c"Independent Work execute");
// Send request through MessageHub.
let mut response_listener = messenger.message(
Payload::Request(self.request.clone()).into(),
Audience::Address(Address::Handler(self.setting_type)),
);
// On success, invoke the responder with the converted response.
self.responder.respond(R::from(match response_listener.next_of::<Payload>().await {
Ok((payload, _)) => match payload {
Payload::Response(response) => response,
_ => {
// While it's possible for the request to fail, this will be communicated
// through the response for logic related errors or the return value of
// receptor::next_of. Work should never encounter a different type of payload
// and therefore treat this scenario as fatal.
panic!("should not have received a different payload type:{payload:?}");
}
},
_ => {
log::warn!(
"An error occurred while independent job was executing for request:{:?}",
self.request.clone()
);
Err(crate::handler::base::Error::CommunicationError)
}
}));
}
}
/// The [From] implementation here is for conveniently converting a [Work] definition into a [Job].
/// Since [Work] is a singleshot request, it is automatically converted into a [Load::Independent]
/// workload.
impl<R, T> From<Work<R, T>> for Job
where
R: From<Response> + 'static,
T: Responder<R> + 'static,
{
fn from(work: Work<R, T>) -> Job {
Job::new(Load::Independent(Box::new(work)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message::base::MessengerType;
use crate::service::MessageHub;
use assert_matches::assert_matches;
use fuchsia_async as fasync;
use futures::channel::oneshot::Sender;
struct TestResponder {
sender: Sender<Response>,
}
impl TestResponder {
pub(crate) fn new(sender: Sender<Response>) -> Self {
Self { sender }
}
}
impl Responder<Response> for TestResponder {
fn respond(self, response: Response) {
self.sender.send(response).expect("send of response should succeed");
}
}
#[fuchsia::test(allow_stalls = false)]
async fn test_request_basic_functionality() {
// Create MessageHub for communication between components.
let message_hub_delegate = MessageHub::create_hub();
// Create mock handler endpoint to receive request.
let mut handler_receiver = message_hub_delegate
.create(MessengerType::Addressable(Address::Handler(SettingType::Unknown)))
.await
.expect("handler messenger should be created")
.1;
// Create job to send request.
let request = Request::Restore;
let (response_tx, response_rx) = futures::channel::oneshot::channel::<Response>();
let work = Box::new(Work::new(
SettingType::Unknown,
request.clone(),
TestResponder::new(response_tx),
));
let work_messenger = message_hub_delegate
.create(MessengerType::Unbound)
.await
.expect("messenger should be created")
.0;
// Retrieve signature before passing in messenger to work for verifying the sender of any
// requests.
let work_messenger_signature = work_messenger.get_signature();
// Execute work asynchronously.
fasync::Task::local(work.execute(work_messenger, 0.into())).detach();
// Ensure the request is sent from the right sender.
let (received_request, client) =
handler_receiver.next_of::<Payload>().await.expect("should successfully get request");
assert_matches!(received_request, Payload::Request(x) if x == request);
assert!(client.get_author() == work_messenger_signature);
// Ensure the response is received and forwarded by the work.
let reply = Ok(None);
let _ = client.reply(Payload::Response(reply.clone()).into());
assert!(response_rx.await.expect("should receive successful response") == reply);
}
#[fuchsia::test(allow_stalls = false)]
async fn test_error_propagation() {
// Create MessageHub for communication between components. Do not create any handler for the
// test SettingType address.
let message_hub_delegate = MessageHub::create_hub();
let (response_tx, response_rx) = futures::channel::oneshot::channel::<Response>();
// Create job to send request.
let request = Request::Restore;
let work = Box::new(Work::new(
SettingType::Unknown,
request.clone(),
TestResponder::new(response_tx),
));
// Execute work on async task.
fasync::Task::local(
work.execute(
message_hub_delegate
.create(MessengerType::Unbound)
.await
.expect("messenger should be created")
.0,
0.into(),
),
)
.detach();
// Ensure an error was returned, which should match that generated by the request work load.
assert_matches!(response_rx.await.expect("should receive successful response"),
Err(x) if x == crate::handler::base::Error::CommunicationError);
}
}