1pub mod config;
6pub mod ota;
7pub mod setup;
8pub mod storage;
9
10use anyhow::{format_err, Error};
11use async_trait::async_trait;
12use diagnostics_data::logs_legacy::{filter_by_tags, format_log_message};
13use diagnostics_reader::{ArchiveReader, Data, Logs};
14use fidl_fuchsia_component::{BinderMarker, CreateChildArgs, RealmMarker, RealmProxy};
15use fidl_fuchsia_component_decl::{Child, ChildRef, CollectionRef, StartupMode};
16use fidl_fuchsia_diagnostics::{ArchiveAccessorMarker, ArchiveAccessorProxy};
17use fidl_fuchsia_logger as flog;
18use fuchsia_component::client;
19use futures::channel::oneshot;
20use futures::lock::Mutex;
21use futures::{Future, FutureExt, StreamExt};
22use std::collections::HashSet;
23use std::pin::Pin;
24
25const COLLECTION_NAME: &str = "ota";
26const CHILD_NAME: &str = "system_recovery_ota";
27const OTA_COMPONENT_URL: &str = "#meta/system_recovery_ota.cm";
28
29type ChildLauncherRet = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
30pub type ChildLauncherFn = Box<dyn Fn() -> ChildLauncherRet + Send + Sync>;
31
32pub async fn child_launcher() -> Result<(), Error> {
33 client::connect_to_childs_protocol::<BinderMarker>(
34 String::from(CHILD_NAME),
35 Some(String::from(COLLECTION_NAME)),
36 )
37 .await?;
38 Ok(())
39}
40
41#[derive(Debug, PartialEq, Clone)]
42pub enum OtaStatus {
43 Succeeded,
44 Failed,
45 Cancelled,
46}
47
48#[async_trait]
49pub trait OtaManager {
50 async fn start_and_wait_for_result(&self) -> Result<(), Error>;
51 async fn stop(&self) -> Result<(), Error>;
52 async fn complete_ota(&self, status: OtaStatus);
53}
54
55pub struct OtaComponent {
56 realm: RealmProxy,
57 completers: Mutex<Vec<oneshot::Sender<OtaStatus>>>,
58 child_launcher: ChildLauncherFn,
59}
60impl OtaComponent {
61 pub fn new() -> Result<Self, Error> {
62 let realm = client::connect_to_protocol::<RealmMarker>()
63 .map_err(|e| format_err!("failed to connect to fuchsia.component.Realm: {:?}", e))?;
64 Ok(Self::new_with_realm_and_launcher(realm, Box::new(|| child_launcher().boxed())))
65 }
66
67 pub fn new_with_realm_and_launcher(realm: RealmProxy, child_launcher: ChildLauncherFn) -> Self {
68 Self { realm, completers: Mutex::new(Vec::new()), child_launcher }
69 }
70}
71
72#[async_trait]
73impl OtaManager for OtaComponent {
74 async fn start_and_wait_for_result(&self) -> Result<(), Error> {
75 let (sender, receiver) = oneshot::channel::<OtaStatus>();
78 self.completers.lock().await.push(sender);
79
80 let collection_ref = CollectionRef { name: String::from(COLLECTION_NAME) };
81 let child_decl = Child {
82 name: Some(String::from(CHILD_NAME)),
83 url: Some(String::from(OTA_COMPONENT_URL)),
84 startup: Some(StartupMode::Lazy),
85 ..Default::default()
86 };
87
88 self.realm
89 .create_child(&collection_ref, &child_decl, CreateChildArgs::default())
90 .await
91 .expect("create_child failed")
92 .map_err(|e| format_err!("failed to start OTA child: {:?}", e))?;
93
94 (self.child_launcher)().await?;
95
96 match receiver.await {
97 Ok(status) => match status {
98 OtaStatus::Succeeded => Ok(()),
99 OtaStatus::Failed => Err(format_err!("OTA failed")),
100 OtaStatus::Cancelled => Err(format_err!("OTA cancelled")),
101 },
102 Err(_) => Err(format_err!("sender dropped")),
103 }
104 }
105
106 async fn stop(&self) -> Result<(), Error> {
107 let child_ref = ChildRef {
108 name: String::from(CHILD_NAME),
109 collection: Some(String::from(COLLECTION_NAME)),
110 };
111
112 self.realm
113 .destroy_child(&child_ref)
114 .await
115 .expect("destroy_child failed")
116 .map_err(|e| format_err!("failed to destroy OTA child: {:?}", e))?;
117
118 _ = self.complete_ota(OtaStatus::Cancelled);
119 Ok(())
120 }
121
122 async fn complete_ota(&self, status: OtaStatus) {
123 while let Some(completer) = self.completers.lock().await.pop() {
124 _ = completer.send(status.clone());
126 }
127 }
128}
129
130fn get_log_level(level: i32) -> String {
131 match level {
133 l if (l == flog::LogLevelFilter::Trace as i32) => "TRACE".to_string(),
134 l if (l == flog::LogLevelFilter::Debug as i32) => "DEBUG".to_string(),
135 l if (l < flog::LogLevelFilter::Info as i32 && l > flog::LogLevelFilter::Debug as i32) => {
136 format!("VLOG({})", (flog::LogLevelFilter::Info as i32) - l)
137 }
138 l if (l == flog::LogLevelFilter::Info as i32) => "INFO".to_string(),
139 l if (l == flog::LogLevelFilter::Warn as i32) => "WARNING".to_string(),
140 l if (l == flog::LogLevelFilter::Error as i32) => "ERROR".to_string(),
141 l if (l == flog::LogLevelFilter::Fatal as i32) => "FATAL".to_string(),
142 l => format!("INVALID({})", l),
143 }
144}
145
146fn format_time(timestamp: zx::BootInstant) -> String {
148 let nanos = timestamp.into_nanos();
149 format!("{:05}.{:06}", nanos / 1000000000, (nanos / 1000) % 1000000)
150}
151
152pub type LogHandlerFnPtr = Box<dyn FnMut(String)>;
153
154#[async_trait(?Send)]
155pub trait OtaLogListener {
156 async fn listen(&self, handler: LogHandlerFnPtr) -> Result<(), Error>;
157}
158
159pub struct OtaLogListenerImpl {
160 log_proxy: ArchiveAccessorProxy,
161}
162
163impl OtaLogListenerImpl {
164 pub fn new() -> Result<Self, Error> {
165 let log_proxy = client::connect_to_protocol::<ArchiveAccessorMarker>().map_err(|e| {
166 format_err!("failed to connect to fuchsia.diagnostics.ArchiveAccessor: {:?}", e)
167 })?;
168 Ok(Self::new_with_proxy(log_proxy))
169 }
170
171 pub fn new_with_proxy(log_proxy: ArchiveAccessorProxy) -> Self {
172 Self { log_proxy }
173 }
174}
175
176#[async_trait(?Send)]
177impl OtaLogListener for OtaLogListenerImpl {
178 async fn listen(&self, handler: LogHandlerFnPtr) -> Result<(), Error> {
179 let mut tags = HashSet::new();
180 tags.insert(format!("{}:{}", COLLECTION_NAME, CHILD_NAME));
181 LogProcessorFn(handler).run(tags, self.log_proxy.clone()).await
182 }
183}
184
185struct LogProcessorFn(LogHandlerFnPtr);
188
189impl LogProcessorFn {
190 async fn run(
191 &mut self,
192 tags: HashSet<String>,
193 archive: ArchiveAccessorProxy,
194 ) -> Result<(), Error> {
195 let mut reader = ArchiveReader::logs();
196 reader.with_archive(archive);
197 let mut stream = reader.snapshot_then_subscribe().unwrap();
198
199 while let Some(Ok(log)) = stream.next().await {
200 if !filter_by_tags(&log, &tags) {
201 self.log(&log);
202 }
203 }
204 Ok(())
205 }
206 fn log(&mut self, message: &Data<Logs>) {
207 let tags = message.tags().unwrap_or(&vec![]).join(", ");
208
209 let line = format!(
210 "[{}][{}] {}: {}",
211 format_time(message.metadata.timestamp),
212 tags,
213 get_log_level(message.severity() as i32),
214 format_log_message(message)
215 );
216
217 (self.0)(line);
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use assert_matches::assert_matches;
225 use diagnostics_data::{BuilderArgs, LogsDataBuilder};
226 use fidl::endpoints::{create_proxy_and_stream, ServerEnd};
227 use fidl_fuchsia_component::{Error, RealmRequest};
228 use fidl_fuchsia_diagnostics::{
229 ArchiveAccessorRequest, BatchIteratorMarker, BatchIteratorRequest, FormattedContent,
230 };
231 use fuchsia_async as fasync;
232 use futures::{StreamExt, TryStreamExt};
233 use std::sync::atomic::{AtomicU8, Ordering};
234 use std::sync::Arc;
235
236 fn create_child_launcher(call_count: Arc<AtomicU8>) -> ChildLauncherFn {
237 Box::new(move || {
238 let call_count = call_count.clone();
239 async move {
240 call_count.fetch_add(1, Ordering::SeqCst);
241 Ok(())
242 }
243 .boxed()
244 })
245 }
246
247 fn create_failing_child_launcher(call_count: Arc<AtomicU8>) -> ChildLauncherFn {
248 Box::new(move || {
249 let call_count = call_count.clone();
250 async move {
251 call_count.fetch_add(1, Ordering::SeqCst);
252 Err(format_err!("failed to launch child"))
253 }
254 .boxed()
255 })
256 }
257
258 #[fuchsia::test]
259 async fn test_complete_ota_sends_no_requests() {
260 let (client, mut stream) = create_proxy_and_stream::<RealmMarker>();
261 let launch_count = Arc::new(AtomicU8::new(0));
262
263 let ota_manager = OtaComponent::new_with_realm_and_launcher(
264 client,
265 create_child_launcher(launch_count.clone()),
266 );
267 ota_manager.complete_ota(OtaStatus::Succeeded).await;
268 ota_manager.complete_ota(OtaStatus::Failed).await;
269
270 drop(ota_manager);
272
273 assert!(stream.next().await.is_none());
275 assert_eq!(0, launch_count.load(Ordering::Relaxed));
276 }
277
278 #[fuchsia::test]
279 async fn test_start_propagates_success_on_ota_success() {
280 let (client, mut stream) = create_proxy_and_stream::<RealmMarker>();
281 let launch_count = Arc::new(AtomicU8::new(0));
282 let ota_manager = Arc::new(OtaComponent::new_with_realm_and_launcher(
283 client,
284 create_child_launcher(launch_count.clone()),
285 ));
286 let ota_manager2 = ota_manager.clone();
287
288 fasync::Task::local(async move {
289 assert_matches!(stream.next().await.unwrap().unwrap(), RealmRequest::CreateChild {
290 collection,
291 decl,
292 args,
293 responder
294 } => {
295 assert_eq!(COLLECTION_NAME.to_string(), collection.name);
296 assert_eq!(Some(CHILD_NAME.to_string()), decl.name);
297 assert_eq!(Some(OTA_COMPONENT_URL.to_string()), decl.url);
298 assert_eq!(Some(StartupMode::Lazy), decl.startup);
299 assert_eq!(CreateChildArgs::default(), args);
300 responder.send(Ok(())).unwrap();
301 });
302
303 ota_manager2.complete_ota(OtaStatus::Succeeded).await;
304 })
305 .detach();
306
307 ota_manager.start_and_wait_for_result().await.unwrap();
308 assert_eq!(1, launch_count.load(Ordering::Relaxed));
309 }
310
311 #[fuchsia::test]
312 async fn test_start_propagates_error_on_ota_failure() {
313 let (client, mut stream) = create_proxy_and_stream::<RealmMarker>();
314 let launch_count = Arc::new(AtomicU8::new(0));
315 let ota_manager = Arc::new(OtaComponent::new_with_realm_and_launcher(
316 client,
317 create_child_launcher(launch_count.clone()),
318 ));
319 let ota_manager2 = ota_manager.clone();
320
321 fasync::Task::local(async move {
322 assert_matches!(stream.next().await.unwrap().unwrap(), RealmRequest::CreateChild {
323 responder, ..
324 } => {
325 responder.send(Ok(())).unwrap();
326 });
327
328 ota_manager2.complete_ota(OtaStatus::Failed).await;
329 })
330 .detach();
331
332 ota_manager.start_and_wait_for_result().await.unwrap_err();
333 assert_eq!(1, launch_count.load(Ordering::Relaxed));
334 }
335
336 #[fuchsia::test]
337 async fn test_start_propagates_error_on_launch_child_failure() {
338 let (client, mut stream) = create_proxy_and_stream::<RealmMarker>();
339 let launch_count = Arc::new(AtomicU8::new(0));
340 let ota_manager = Arc::new(OtaComponent::new_with_realm_and_launcher(
341 client,
342 create_failing_child_launcher(launch_count.clone()),
343 ));
344 let ota_manager2 = ota_manager.clone();
345
346 fasync::Task::local(async move {
347 assert_matches!(stream.next().await.unwrap().unwrap(), RealmRequest::CreateChild {
348 responder, ..
349 } => {
350 responder.send(Ok(())).unwrap();
351 });
352
353 ota_manager2.complete_ota(OtaStatus::Succeeded).await;
355 })
356 .detach();
357
358 ota_manager.start_and_wait_for_result().await.unwrap_err();
359 assert_eq!(1, launch_count.load(Ordering::Relaxed));
360 }
361
362 #[fuchsia::test]
363 async fn test_stop_proxies_to_realm_returns_ok() {
364 let (client, mut stream) = create_proxy_and_stream::<RealmMarker>();
365 let launch_count = Arc::new(AtomicU8::new(0));
366 let ota_manager = OtaComponent::new_with_realm_and_launcher(
367 client,
368 create_child_launcher(launch_count.clone()),
369 );
370
371 fasync::Task::local(async move {
372 assert_matches!(stream.next().await.unwrap().unwrap(), RealmRequest::DestroyChild {
373 child,
374 responder
375 } => {
376 assert_eq!(CHILD_NAME.to_string(), child.name);
377 assert_eq!(Some(COLLECTION_NAME.to_string()), child.collection);
378 responder.send(Ok(())).unwrap();
379 });
380 })
381 .detach();
382
383 ota_manager.stop().await.unwrap();
384 assert_eq!(0, launch_count.load(Ordering::Relaxed));
385 }
386
387 #[fuchsia::test]
388 async fn test_stop_proxies_to_realm_returns_err() {
389 let (client, mut stream) = create_proxy_and_stream::<RealmMarker>();
390 let launch_count = Arc::new(AtomicU8::new(0));
391 let ota_manager = OtaComponent::new_with_realm_and_launcher(
392 client,
393 create_child_launcher(launch_count.clone()),
394 );
395
396 fasync::Task::local(async move {
397 assert_matches!(stream.next().await.unwrap().unwrap(), RealmRequest::DestroyChild {
398 child,
399 responder
400 } => {
401 assert_eq!(CHILD_NAME.to_string(), child.name);
402 assert_eq!(Some(COLLECTION_NAME.to_string()), child.collection);
403 responder.send(Err(Error::Internal)).unwrap();
404 });
405 })
406 .detach();
407
408 ota_manager.stop().await.unwrap_err();
409 assert_eq!(0, launch_count.load(Ordering::Relaxed));
410 }
411
412 #[fuchsia::test]
413 async fn test_stop_unblocks_start_with_err() {
414 let (client, mut stream) = create_proxy_and_stream::<RealmMarker>();
415 let launch_count = Arc::new(AtomicU8::new(0));
416 let ota_manager = Arc::new(OtaComponent::new_with_realm_and_launcher(
417 client,
418 create_child_launcher(launch_count.clone()),
419 ));
420 let ota_manager2 = ota_manager.clone();
421
422 fasync::Task::local(async move {
423 ota_manager.start_and_wait_for_result().await.unwrap_err();
424 assert_eq!(1, launch_count.load(Ordering::Relaxed));
425 })
426 .detach();
427
428 assert_matches!(stream.next().await.unwrap().unwrap(), RealmRequest::CreateChild {
429 collection,
430 decl,
431 args,
432 responder
433 } => {
434 assert_eq!(COLLECTION_NAME.to_string(), collection.name);
435 assert_eq!(Some(CHILD_NAME.to_string()), decl.name);
436 assert_eq!(Some(OTA_COMPONENT_URL.to_string()), decl.url);
437 assert_eq!(Some(StartupMode::Lazy), decl.startup);
438 assert_eq!(CreateChildArgs::default(), args);
439 responder.send(Ok(())).unwrap();
440 });
441
442 fasync::Task::local(async move {
443 ota_manager2.stop().await.unwrap_err();
444 })
445 .detach();
446
447 assert_matches!(stream.next().await.unwrap().unwrap(), RealmRequest::DestroyChild {
448 child,
449 responder
450 } => {
451 assert_eq!(CHILD_NAME.to_string(), child.name);
452 assert_eq!(Some(COLLECTION_NAME.to_string()), child.collection);
453 responder.send(Ok(())).unwrap();
454 });
455 }
456
457 async fn handle_batch_iterator(
458 data: serde_json::Value,
459 result_stream: ServerEnd<BatchIteratorMarker>,
460 ) {
461 let mut stream = result_stream.into_stream();
462 while let Some(req) = stream.try_next().await.expect("stream request") {
463 match req {
464 BatchIteratorRequest::WaitForReady { responder } => {
465 let _ = responder.send();
466 }
467 BatchIteratorRequest::GetNext { responder } => {
468 let content = serde_json::to_string_pretty(&data).expect("json pretty");
469 let vmo_size = content.len() as u64;
470 let vmo = zx::Vmo::create(vmo_size).expect("create vmo");
471 vmo.write(content.as_bytes(), 0).expect("write vmo");
472 let buffer = fidl_fuchsia_mem::Buffer { vmo, size: vmo_size };
473 responder
474 .send(Ok(vec![FormattedContent::Json(buffer)]))
475 .expect("send response");
476 break;
477 }
478 BatchIteratorRequest::_UnknownMethod { .. } => {
479 unreachable!("Unexpected method call");
480 }
481 }
482 }
483 }
484
485 fn spawn_fake_archive(
486 data_to_send: serde_json::Value,
487 ) -> (ArchiveAccessorProxy, impl Future<Output = ()>) {
488 let (proxy, mut stream) =
489 fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
490 let task = async move {
491 while let Some(request) = stream.try_next().await.expect("stream request") {
492 match request {
493 ArchiveAccessorRequest::StreamDiagnostics { result_stream, .. } => {
494 let data = data_to_send.clone();
495 handle_batch_iterator(data, result_stream).await;
496 break;
497 }
498 ArchiveAccessorRequest::WaitForReady { responder, .. } => {
499 let _ = responder.send();
500 }
501 ArchiveAccessorRequest::_UnknownMethod { .. } => {
502 unreachable!("Unexpected method call");
503 }
504 }
505 }
506 };
507 (proxy, task)
508 }
509
510 #[fuchsia::test]
511 async fn test_log_listener_listens() -> Result<(), Error> {
512 let lines = Arc::new(Mutex::new(Vec::new()));
513 let lines2 = lines.clone();
514 let expected_msg = "this is a test message".to_string();
515 let (log_proxy, archive_accessor_task) = spawn_fake_archive(
516 serde_json::from_str(
517 &serde_json::to_string(
518 &LogsDataBuilder::new(BuilderArgs {
519 component_url: None,
520 moniker: diagnostics_data::ExtendedMoniker::ComponentManager,
521 severity: diagnostics_data::Severity::Trace,
522 timestamp: zx::BootInstant::ZERO,
523 })
524 .set_raw_severity(0)
525 .add_tag(format!("{}:{}", COLLECTION_NAME, CHILD_NAME))
526 .set_message(expected_msg.clone())
527 .build(),
528 )
529 .unwrap(),
530 )
531 .unwrap(),
532 );
533
534 let listener = OtaLogListenerImpl::new_with_proxy(log_proxy);
535 let reader = fasync::Task::local(async move {
536 let lines = lines2.clone();
537 listener
538 .listen(Box::new(move |line| {
539 let lines = lines.clone();
540 futures::executor::block_on(async move {
541 lines.lock().await.push(line);
542 });
543 }))
544 .await
545 .unwrap();
546 });
547 archive_accessor_task.await;
548 reader.await;
549 assert_eq!(1, lines.lock().await.len());
550 assert!(lines.lock().await[0].ends_with(&expected_msg));
551 Ok(())
552 }
553}