persistence/
persist_server.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Copyright 2020 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use crate::{constants, Scheduler};
use fidl_fuchsia_diagnostics_persist::{
    DataPersistenceRequest, DataPersistenceRequestStream, PersistResult,
};
use fuchsia_async::TaskGroup;
use fuchsia_component::server::{ServiceFs, ServiceObj};
use fuchsia_sync::Mutex;
use futures::StreamExt;
use log::*;
use persistence_config::{ServiceName, Tag};
use std::collections::HashSet;
use std::sync::Arc;

pub struct PersistServerData {
    // Service name that this persist server is hosting.
    service_name: ServiceName,
    // Mapping from a string tag to an archive reader
    // configured to fetch a specific set of selectors.
    tags: HashSet<Tag>,
    // Scheduler that will handle the persist requests
    scheduler: Scheduler,
}

#[derive(Clone)]
pub(crate) struct PersistServer(Arc<Mutex<PersistServerData>>);

impl PersistServer {
    pub fn create(
        service_name: ServiceName,
        tags: Vec<Tag>,
        scheduler: Scheduler,
    ) -> PersistServer {
        let tags = HashSet::from_iter(tags);
        PersistServer(Arc::new(Mutex::new(PersistServerData { service_name, tags, scheduler })))
    }

    // Serve the Persist FIDL protocol.
    pub fn launch_server(
        self,
        task_holder: Arc<Mutex<TaskGroup>>,
        fs: &mut ServiceFs<ServiceObj<'static, ()>>,
    ) {
        let unique_service_name =
            format!("{}-{}", constants::PERSIST_SERVICE_NAME_PREFIX, self.0.lock().service_name);

        let this = self;
        fs.dir("svc").add_fidl_service_at(
            unique_service_name,
            move |mut stream: DataPersistenceRequestStream| {
                let this = this.clone();
                task_holder.lock().spawn(async move {
                    while let Some(Ok(request)) = stream.next().await {
                        let this = this.0.lock();
                        match request {
                            DataPersistenceRequest::Persist { tag, responder, .. } => {
                                let response = if let Ok(tag) = Tag::new(tag) {
                                    if this.tags.contains(&tag) {
                                        this.scheduler.schedule(&this.service_name, vec![tag]);
                                        PersistResult::Queued
                                    } else {
                                        PersistResult::BadName
                                    }
                                } else {
                                    PersistResult::BadName
                                };
                                responder.send(response).unwrap_or_else(|err| {
                                    warn!("Failed to respond {:?} to client: {}", response, err)
                                });
                            }
                            DataPersistenceRequest::PersistTags { tags, responder, .. } => {
                                let (response, tags) = this.validate_tags(&tags);
                                if !tags.is_empty() {
                                    this.scheduler.schedule(&this.service_name, tags);
                                }
                                responder.send(&response).unwrap_or_else(|err| {
                                    warn!("Failed to respond {:?} to client: {}", response, err)
                                });
                            }
                        }
                    }
                });
            },
        );
    }
}

impl PersistServerData {
    fn validate_tags(&self, tags: &[String]) -> (Vec<PersistResult>, Vec<Tag>) {
        let mut response = vec![];
        let mut good_tags = vec![];
        for tag in tags.iter() {
            if let Ok(tag) = Tag::new(tag.to_string()) {
                if self.tags.contains(&tag) {
                    response.push(PersistResult::Queued);
                    good_tags.push(tag);
                } else {
                    response.push(PersistResult::BadName);
                    warn!("Tag '{}' was requested but is not configured", tag);
                }
            } else {
                response.push(PersistResult::BadName);
                warn!("Tag '{}' was requested but is not a valid tag string", tag);
            }
        }
        (response, good_tags)
    }
}