archivist_lib/logs/servers/
log.rs1use crate::logs::error::LogsError;
6use crate::logs::listener::Listener;
7use crate::logs::repository::LogsRepository;
8use fidl::endpoints::DiscoverableProtocolMarker;
9use fidl_fuchsia_diagnostics::StreamMode;
10use futures::StreamExt;
11use log::warn;
12use std::pin::pin;
13use std::sync::Arc;
14use {fidl_fuchsia_logger as flogger, fuchsia_async as fasync};
15
16pub struct LogServer {
17 logs_repo: Arc<LogsRepository>,
19
20 scope: fasync::Scope,
22}
23
24impl LogServer {
25 pub fn new(logs_repo: Arc<LogsRepository>, scope: fasync::Scope) -> Self {
26 Self { logs_repo, scope }
27 }
28
29 pub fn spawn(&self, stream: flogger::LogRequestStream) {
31 let logs_repo = Arc::clone(&self.logs_repo);
32 let scope = self.scope.to_handle();
33 self.scope.spawn(async move {
34 if let Err(e) = Self::handle_requests(logs_repo, stream, scope).await {
35 warn!("error handling Log requests: {}", e);
36 }
37 });
38 }
39
40 async fn handle_requests(
43 logs_repo: Arc<LogsRepository>,
44 mut stream: flogger::LogRequestStream,
45 scope: fasync::ScopeHandle,
46 ) -> Result<(), LogsError> {
47 let connection_id = logs_repo.new_interest_connection();
48 while let Some(request) = stream.next().await {
49 let request = request.map_err(|source| LogsError::HandlingRequests {
50 protocol: flogger::LogMarker::PROTOCOL_NAME,
51 source,
52 })?;
53 let listener = match request {
54 flogger::LogRequest::ListenSafe { log_listener, options, .. } => {
55 Listener::new(log_listener, options)?
56 }
57 };
58 let logs =
59 logs_repo.logs_cursor(StreamMode::SnapshotThenSubscribe, Vec::new()).map(Arc::new);
60 scope.spawn(async move {
61 listener.run(pin!(logs)).await;
62 });
63 }
64 logs_repo.finish_interest_connection(connection_id);
65 Ok(())
66 }
67}