persistence/
persist_server.rs1use crate::scheduler;
6use anyhow::{Error, format_err};
7use fidl::endpoints;
8use fidl_fuchsia_diagnostics_persist::{
9 DataPersistenceMarker, DataPersistenceRequest, DataPersistenceRequestStream, PersistResult,
10};
11use futures::{StreamExt, TryStreamExt};
12use log::*;
13use persistence_config::ServiceName;
14use std::sync::Arc;
15use {fidl_fuchsia_component_sandbox as fsandbox, fuchsia_async as fasync};
16
17pub struct PersistServerData {
18 service_name: ServiceName,
20 scheduler: scheduler::Scheduler,
22}
23
24pub(crate) struct PersistServer;
26
27impl PersistServer {
28 pub fn spawn(
30 service_name: ServiceName,
31 scheduler: scheduler::Scheduler,
32 scope: fasync::ScopeHandle,
33 requests: fsandbox::ReceiverRequestStream,
34 ) {
35 let data = Arc::new(PersistServerData { service_name, scheduler });
36 scope.spawn(Self::accept_connections(data, requests, scope.clone()));
37 }
38
39 async fn accept_connections(
40 data: Arc<PersistServerData>,
41 mut stream: fsandbox::ReceiverRequestStream,
42 scope: fasync::ScopeHandle,
43 ) {
44 while let Some(request) = stream.try_next().await.unwrap() {
45 match request {
46 fsandbox::ReceiverRequest::Receive { channel, control_handle: _ } => {
47 let data = data.clone();
48 scope.spawn(async move {
49 let server_end =
50 endpoints::ServerEnd::<DataPersistenceMarker>::new(channel);
51 let stream: DataPersistenceRequestStream = server_end.into_stream();
52 if let Err(e) = Self::handle_requests(data, stream).await {
53 warn!("error handling persistence request: {e}");
54 }
55 });
56 }
57 fsandbox::ReceiverRequest::_UnknownMethod { ordinal, .. } => {
58 warn!(ordinal:%; "Unknown Receiver request");
59 }
60 }
61 }
62 }
63
64 async fn handle_requests(
65 data: Arc<PersistServerData>,
66 mut stream: DataPersistenceRequestStream,
67 ) -> Result<(), Error> {
68 while let Some(request) = stream.next().await {
69 let request =
70 request.map_err(|e| format_err!("error handling persistence request: {e:?}"))?;
71
72 debug!("Received {request:?}");
73
74 match request {
75 DataPersistenceRequest::Persist { tag, responder, .. } => {
76 let response = match data.scheduler.schedule(&data.service_name, [tag]).pop() {
77 Some(Ok(())) => PersistResult::Queued,
78 Some(Err(e)) => e.into(),
79 None => {
80 error!("Failed to retrieve a response from scheduler");
81 PersistResult::InternalError
82 }
83 };
84 responder.send(response).map_err(|err| {
85 format_err!("Failed to respond {:?} to client: {}", response, err)
86 })?;
87 }
88 DataPersistenceRequest::PersistTags { tags, responder, .. } => {
89 let response: Vec<PersistResult> = data
90 .scheduler
91 .schedule(&data.service_name, tags)
92 .into_iter()
93 .map(|res| match res {
94 Ok(()) => PersistResult::Queued,
95 Err(e) => e.into(),
96 })
97 .collect();
98 responder.send(&response).map_err(|err| {
99 format_err!("Failed to respond {:?} to client: {}", response, err)
100 })?;
101 }
102 }
103 }
104 Ok(())
105 }
106}