persistence/
persist_server.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::scheduler;
6use anyhow::{Error, format_err};
7use fidl::endpoints;
8use fidl_fuchsia_diagnostics_persist::{
9    DataPersistenceMarker, DataPersistenceRequest, DataPersistenceRequestStream, PersistResult,
10};
11use futures::{StreamExt, TryStreamExt};
12use log::*;
13use persistence_config::ServiceName;
14use std::sync::Arc;
15use {fidl_fuchsia_component_sandbox as fsandbox, fuchsia_async as fasync};
16
17pub struct PersistServerData {
18    // Service name that this persist server is hosting.
19    service_name: ServiceName,
20    // Scheduler that will handle the persist requests
21    scheduler: scheduler::Scheduler,
22}
23
24/// PersistServer handles all requests for a single persistence service.
25pub(crate) struct PersistServer;
26
27impl PersistServer {
28    /// Spawn a task to handle requests from components through a dynamic dictionary.
29    pub fn spawn(
30        service_name: ServiceName,
31        scheduler: scheduler::Scheduler,
32        scope: fasync::ScopeHandle,
33        requests: fsandbox::ReceiverRequestStream,
34    ) {
35        let data = Arc::new(PersistServerData { service_name, scheduler });
36        scope.spawn(Self::accept_connections(data, requests, scope.clone()));
37    }
38
39    async fn accept_connections(
40        data: Arc<PersistServerData>,
41        mut stream: fsandbox::ReceiverRequestStream,
42        scope: fasync::ScopeHandle,
43    ) {
44        while let Some(request) = stream.try_next().await.unwrap() {
45            match request {
46                fsandbox::ReceiverRequest::Receive { channel, control_handle: _ } => {
47                    let data = data.clone();
48                    scope.spawn(async move {
49                        let server_end =
50                            endpoints::ServerEnd::<DataPersistenceMarker>::new(channel);
51                        let stream: DataPersistenceRequestStream = server_end.into_stream();
52                        if let Err(e) = Self::handle_requests(data, stream).await {
53                            warn!("error handling persistence request: {e}");
54                        }
55                    });
56                }
57                fsandbox::ReceiverRequest::_UnknownMethod { ordinal, .. } => {
58                    warn!(ordinal:%; "Unknown Receiver request");
59                }
60            }
61        }
62    }
63
64    async fn handle_requests(
65        data: Arc<PersistServerData>,
66        mut stream: DataPersistenceRequestStream,
67    ) -> Result<(), Error> {
68        while let Some(request) = stream.next().await {
69            let request =
70                request.map_err(|e| format_err!("error handling persistence request: {e:?}"))?;
71
72            debug!("Received {request:?}");
73
74            match request {
75                DataPersistenceRequest::Persist { tag, responder, .. } => {
76                    let response = match data.scheduler.schedule(&data.service_name, [tag]).pop() {
77                        Some(Ok(())) => PersistResult::Queued,
78                        Some(Err(e)) => e.into(),
79                        None => {
80                            error!("Failed to retrieve a response from scheduler");
81                            PersistResult::InternalError
82                        }
83                    };
84                    responder.send(response).map_err(|err| {
85                        format_err!("Failed to respond {:?} to client: {}", response, err)
86                    })?;
87                }
88                DataPersistenceRequest::PersistTags { tags, responder, .. } => {
89                    let response: Vec<PersistResult> = data
90                        .scheduler
91                        .schedule(&data.service_name, tags)
92                        .into_iter()
93                        .map(|res| match res {
94                            Ok(()) => PersistResult::Queued,
95                            Err(e) => e.into(),
96                        })
97                        .collect();
98                    responder.send(&response).map_err(|err| {
99                        format_err!("Failed to respond {:?} to client: {}", response, err)
100                    })?;
101                }
102            }
103        }
104        Ok(())
105    }
106}