Skip to main content

ota_lib/
ota_lib.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5pub mod config;
6pub mod ota;
7pub mod setup;
8pub mod storage;
9
10use anyhow::{format_err, Error};
11use async_trait::async_trait;
12use diagnostics_data::logs_legacy::{filter_by_tags, format_log_message};
13use diagnostics_reader::{ArchiveReader, Data, Logs};
14use fidl_fuchsia_component::{BinderMarker, CreateChildArgs, RealmMarker, RealmProxy};
15use fidl_fuchsia_component_decl::{Child, ChildRef, CollectionRef, StartupMode};
16use fidl_fuchsia_diagnostics::{ArchiveAccessorMarker, ArchiveAccessorProxy};
17use fidl_fuchsia_logger as flog;
18use fuchsia_component::client;
19use futures::channel::oneshot;
20use futures::lock::Mutex;
21use futures::{Future, FutureExt, StreamExt};
22use std::collections::HashSet;
23use std::pin::Pin;
24
25const COLLECTION_NAME: &str = "ota";
26const CHILD_NAME: &str = "system_recovery_ota";
27const OTA_COMPONENT_URL: &str = "#meta/system_recovery_ota.cm";
28
29type ChildLauncherRet = Pin<Box<dyn Future<Output = Result<(), Error>> + Send>>;
30pub type ChildLauncherFn = Box<dyn Fn() -> ChildLauncherRet + Send + Sync>;
31
32pub async fn child_launcher() -> Result<(), Error> {
33    client::connect_to_childs_protocol::<BinderMarker>(
34        String::from(CHILD_NAME),
35        Some(String::from(COLLECTION_NAME)),
36    )
37    .await?;
38    Ok(())
39}
40
41#[derive(Debug, PartialEq, Clone)]
42pub enum OtaStatus {
43    Succeeded,
44    Failed,
45    Cancelled,
46}
47
48#[async_trait]
49pub trait OtaManager {
50    async fn start_and_wait_for_result(&self) -> Result<(), Error>;
51    async fn stop(&self) -> Result<(), Error>;
52    async fn complete_ota(&self, status: OtaStatus);
53}
54
55pub struct OtaComponent {
56    realm: RealmProxy,
57    completers: Mutex<Vec<oneshot::Sender<OtaStatus>>>,
58    child_launcher: ChildLauncherFn,
59}
60impl OtaComponent {
61    pub fn new() -> Result<Self, Error> {
62        let realm = client::connect_to_protocol::<RealmMarker>()
63            .map_err(|e| format_err!("failed to connect to fuchsia.component.Realm: {:?}", e))?;
64        Ok(Self::new_with_realm_and_launcher(realm, Box::new(|| child_launcher().boxed())))
65    }
66
67    pub fn new_with_realm_and_launcher(realm: RealmProxy, child_launcher: ChildLauncherFn) -> Self {
68        Self { realm, completers: Mutex::new(Vec::new()), child_launcher }
69    }
70}
71
72#[async_trait]
73impl OtaManager for OtaComponent {
74    async fn start_and_wait_for_result(&self) -> Result<(), Error> {
75        // Store the completer even if launching the child may fail. It will be cleaned up
76        // when `complete_ota` is called.
77        let (sender, receiver) = oneshot::channel::<OtaStatus>();
78        self.completers.lock().await.push(sender);
79
80        let collection_ref = CollectionRef { name: String::from(COLLECTION_NAME) };
81        let child_decl = Child {
82            name: Some(String::from(CHILD_NAME)),
83            url: Some(String::from(OTA_COMPONENT_URL)),
84            startup: Some(StartupMode::Lazy),
85            ..Default::default()
86        };
87
88        self.realm
89            .create_child(&collection_ref, &child_decl, CreateChildArgs::default())
90            .await
91            .expect("create_child failed")
92            .map_err(|e| format_err!("failed to start OTA child: {:?}", e))?;
93
94        (self.child_launcher)().await?;
95
96        match receiver.await {
97            Ok(status) => match status {
98                OtaStatus::Succeeded => Ok(()),
99                OtaStatus::Failed => Err(format_err!("OTA failed")),
100                OtaStatus::Cancelled => Err(format_err!("OTA cancelled")),
101            },
102            Err(_) => Err(format_err!("sender dropped")),
103        }
104    }
105
106    async fn stop(&self) -> Result<(), Error> {
107        let child_ref = ChildRef {
108            name: String::from(CHILD_NAME),
109            collection: Some(String::from(COLLECTION_NAME)),
110        };
111
112        self.realm
113            .destroy_child(&child_ref)
114            .await
115            .expect("destroy_child failed")
116            .map_err(|e| format_err!("failed to destroy OTA child: {:?}", e))?;
117
118        _ = self.complete_ota(OtaStatus::Cancelled);
119        Ok(())
120    }
121
122    async fn complete_ota(&self, status: OtaStatus) {
123        while let Some(completer) = self.completers.lock().await.pop() {
124            // If receiver was dropped, ignore. Status is sent in `start_and_wait_for_result`.
125            _ = completer.send(status.clone());
126        }
127    }
128}
129
130fn get_log_level(level: i32) -> String {
131    // note levels align with syslog logger.h definitions
132    match level {
133        l if (l == flog::LogLevelFilter::Trace as i32) => "TRACE".to_string(),
134        l if (l == flog::LogLevelFilter::Debug as i32) => "DEBUG".to_string(),
135        l if (l < flog::LogLevelFilter::Info as i32 && l > flog::LogLevelFilter::Debug as i32) => {
136            format!("VLOG({})", (flog::LogLevelFilter::Info as i32) - l)
137        }
138        l if (l == flog::LogLevelFilter::Info as i32) => "INFO".to_string(),
139        l if (l == flog::LogLevelFilter::Warn as i32) => "WARNING".to_string(),
140        l if (l == flog::LogLevelFilter::Error as i32) => "ERROR".to_string(),
141        l if (l == flog::LogLevelFilter::Fatal as i32) => "FATAL".to_string(),
142        l => format!("INVALID({})", l),
143    }
144}
145
146// Assume monotonic time is sufficient for debug logs in recovery.
147fn format_time(timestamp: zx::BootInstant) -> String {
148    let nanos = timestamp.into_nanos();
149    format!("{:05}.{:06}", nanos / 1000000000, (nanos / 1000) % 1000000)
150}
151
152pub type LogHandlerFnPtr = Box<dyn FnMut(String)>;
153
154#[async_trait(?Send)]
155pub trait OtaLogListener {
156    async fn listen(&self, handler: LogHandlerFnPtr) -> Result<(), Error>;
157}
158
159pub struct OtaLogListenerImpl {
160    log_proxy: ArchiveAccessorProxy,
161}
162
163impl OtaLogListenerImpl {
164    pub fn new() -> Result<Self, Error> {
165        let log_proxy = client::connect_to_protocol::<ArchiveAccessorMarker>().map_err(|e| {
166            format_err!("failed to connect to fuchsia.diagnostics.ArchiveAccessor: {:?}", e)
167        })?;
168        Ok(Self::new_with_proxy(log_proxy))
169    }
170
171    pub fn new_with_proxy(log_proxy: ArchiveAccessorProxy) -> Self {
172        Self { log_proxy }
173    }
174}
175
176#[async_trait(?Send)]
177impl OtaLogListener for OtaLogListenerImpl {
178    async fn listen(&self, handler: LogHandlerFnPtr) -> Result<(), Error> {
179        let mut tags = HashSet::new();
180        tags.insert(format!("{}:{}", COLLECTION_NAME, CHILD_NAME));
181        LogProcessorFn(handler).run(tags, self.log_proxy.clone()).await
182    }
183}
184
185// We cannot directly implement LogProcessor for FnMut(String). See rustc error E0210 for more info.
186// To work around this, the FnMut(String) must be wrapped in a local type that implements the trait.
187struct LogProcessorFn(LogHandlerFnPtr);
188
189impl LogProcessorFn {
190    async fn run(
191        &mut self,
192        tags: HashSet<String>,
193        archive: ArchiveAccessorProxy,
194    ) -> Result<(), Error> {
195        let mut reader = ArchiveReader::logs();
196        reader.with_archive(archive);
197        let mut stream = reader.snapshot_then_subscribe().unwrap();
198
199        while let Some(Ok(log)) = stream.next().await {
200            if !filter_by_tags(&log, &tags) {
201                self.log(&log);
202            }
203        }
204        Ok(())
205    }
206    fn log(&mut self, message: &Data<Logs>) {
207        let tags = message.tags().unwrap_or(&vec![]).join(", ");
208
209        let line = format!(
210            "[{}][{}] {}: {}",
211            format_time(message.metadata.timestamp),
212            tags,
213            get_log_level(message.severity() as i32),
214            format_log_message(message)
215        );
216
217        (self.0)(line);
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use assert_matches::assert_matches;
225    use diagnostics_data::{BuilderArgs, LogsDataBuilder};
226    use fidl::endpoints::{create_proxy_and_stream, ServerEnd};
227    use fidl_fuchsia_component::{Error, RealmRequest};
228    use fidl_fuchsia_diagnostics::{
229        ArchiveAccessorRequest, BatchIteratorMarker, BatchIteratorRequest, FormattedContent,
230    };
231    use fuchsia_async as fasync;
232    use futures::{StreamExt, TryStreamExt};
233    use std::sync::atomic::{AtomicU8, Ordering};
234    use std::sync::Arc;
235
236    fn create_child_launcher(call_count: Arc<AtomicU8>) -> ChildLauncherFn {
237        Box::new(move || {
238            let call_count = call_count.clone();
239            async move {
240                call_count.fetch_add(1, Ordering::SeqCst);
241                Ok(())
242            }
243            .boxed()
244        })
245    }
246
247    fn create_failing_child_launcher(call_count: Arc<AtomicU8>) -> ChildLauncherFn {
248        Box::new(move || {
249            let call_count = call_count.clone();
250            async move {
251                call_count.fetch_add(1, Ordering::SeqCst);
252                Err(format_err!("failed to launch child"))
253            }
254            .boxed()
255        })
256    }
257
258    #[fuchsia::test]
259    async fn test_complete_ota_sends_no_requests() {
260        let (client, mut stream) = create_proxy_and_stream::<RealmMarker>();
261        let launch_count = Arc::new(AtomicU8::new(0));
262
263        let ota_manager = OtaComponent::new_with_realm_and_launcher(
264            client,
265            create_child_launcher(launch_count.clone()),
266        );
267        ota_manager.complete_ota(OtaStatus::Succeeded).await;
268        ota_manager.complete_ota(OtaStatus::Failed).await;
269
270        // Drop ota_manager so `stream` closes.
271        drop(ota_manager);
272
273        // No requests should be sent.
274        assert!(stream.next().await.is_none());
275        assert_eq!(0, launch_count.load(Ordering::Relaxed));
276    }
277
278    #[fuchsia::test]
279    async fn test_start_propagates_success_on_ota_success() {
280        let (client, mut stream) = create_proxy_and_stream::<RealmMarker>();
281        let launch_count = Arc::new(AtomicU8::new(0));
282        let ota_manager = Arc::new(OtaComponent::new_with_realm_and_launcher(
283            client,
284            create_child_launcher(launch_count.clone()),
285        ));
286        let ota_manager2 = ota_manager.clone();
287
288        fasync::Task::local(async move {
289            assert_matches!(stream.next().await.unwrap().unwrap(), RealmRequest::CreateChild {
290                collection,
291                decl,
292                args,
293                responder
294            } => {
295                assert_eq!(COLLECTION_NAME.to_string(), collection.name);
296                assert_eq!(Some(CHILD_NAME.to_string()), decl.name);
297                assert_eq!(Some(OTA_COMPONENT_URL.to_string()), decl.url);
298                assert_eq!(Some(StartupMode::Lazy), decl.startup);
299                assert_eq!(CreateChildArgs::default(), args);
300                responder.send(Ok(())).unwrap();
301            });
302
303            ota_manager2.complete_ota(OtaStatus::Succeeded).await;
304        })
305        .detach();
306
307        ota_manager.start_and_wait_for_result().await.unwrap();
308        assert_eq!(1, launch_count.load(Ordering::Relaxed));
309    }
310
311    #[fuchsia::test]
312    async fn test_start_propagates_error_on_ota_failure() {
313        let (client, mut stream) = create_proxy_and_stream::<RealmMarker>();
314        let launch_count = Arc::new(AtomicU8::new(0));
315        let ota_manager = Arc::new(OtaComponent::new_with_realm_and_launcher(
316            client,
317            create_child_launcher(launch_count.clone()),
318        ));
319        let ota_manager2 = ota_manager.clone();
320
321        fasync::Task::local(async move {
322            assert_matches!(stream.next().await.unwrap().unwrap(), RealmRequest::CreateChild {
323                responder, ..
324            } => {
325                responder.send(Ok(())).unwrap();
326            });
327
328            ota_manager2.complete_ota(OtaStatus::Failed).await;
329        })
330        .detach();
331
332        ota_manager.start_and_wait_for_result().await.unwrap_err();
333        assert_eq!(1, launch_count.load(Ordering::Relaxed));
334    }
335
336    #[fuchsia::test]
337    async fn test_start_propagates_error_on_launch_child_failure() {
338        let (client, mut stream) = create_proxy_and_stream::<RealmMarker>();
339        let launch_count = Arc::new(AtomicU8::new(0));
340        let ota_manager = Arc::new(OtaComponent::new_with_realm_and_launcher(
341            client,
342            create_failing_child_launcher(launch_count.clone()),
343        ));
344        let ota_manager2 = ota_manager.clone();
345
346        fasync::Task::local(async move {
347            assert_matches!(stream.next().await.unwrap().unwrap(), RealmRequest::CreateChild {
348                responder, ..
349            } => {
350                responder.send(Ok(())).unwrap();
351            });
352
353            // complete_ota should have no effect on error state.
354            ota_manager2.complete_ota(OtaStatus::Succeeded).await;
355        })
356        .detach();
357
358        ota_manager.start_and_wait_for_result().await.unwrap_err();
359        assert_eq!(1, launch_count.load(Ordering::Relaxed));
360    }
361
362    #[fuchsia::test]
363    async fn test_stop_proxies_to_realm_returns_ok() {
364        let (client, mut stream) = create_proxy_and_stream::<RealmMarker>();
365        let launch_count = Arc::new(AtomicU8::new(0));
366        let ota_manager = OtaComponent::new_with_realm_and_launcher(
367            client,
368            create_child_launcher(launch_count.clone()),
369        );
370
371        fasync::Task::local(async move {
372            assert_matches!(stream.next().await.unwrap().unwrap(), RealmRequest::DestroyChild {
373                child,
374                responder
375            } => {
376                assert_eq!(CHILD_NAME.to_string(), child.name);
377                assert_eq!(Some(COLLECTION_NAME.to_string()), child.collection);
378                responder.send(Ok(())).unwrap();
379            });
380        })
381        .detach();
382
383        ota_manager.stop().await.unwrap();
384        assert_eq!(0, launch_count.load(Ordering::Relaxed));
385    }
386
387    #[fuchsia::test]
388    async fn test_stop_proxies_to_realm_returns_err() {
389        let (client, mut stream) = create_proxy_and_stream::<RealmMarker>();
390        let launch_count = Arc::new(AtomicU8::new(0));
391        let ota_manager = OtaComponent::new_with_realm_and_launcher(
392            client,
393            create_child_launcher(launch_count.clone()),
394        );
395
396        fasync::Task::local(async move {
397            assert_matches!(stream.next().await.unwrap().unwrap(), RealmRequest::DestroyChild {
398                child,
399                responder
400            } => {
401                assert_eq!(CHILD_NAME.to_string(), child.name);
402                assert_eq!(Some(COLLECTION_NAME.to_string()), child.collection);
403                responder.send(Err(Error::Internal)).unwrap();
404            });
405        })
406        .detach();
407
408        ota_manager.stop().await.unwrap_err();
409        assert_eq!(0, launch_count.load(Ordering::Relaxed));
410    }
411
412    #[fuchsia::test]
413    async fn test_stop_unblocks_start_with_err() {
414        let (client, mut stream) = create_proxy_and_stream::<RealmMarker>();
415        let launch_count = Arc::new(AtomicU8::new(0));
416        let ota_manager = Arc::new(OtaComponent::new_with_realm_and_launcher(
417            client,
418            create_child_launcher(launch_count.clone()),
419        ));
420        let ota_manager2 = ota_manager.clone();
421
422        fasync::Task::local(async move {
423            ota_manager.start_and_wait_for_result().await.unwrap_err();
424            assert_eq!(1, launch_count.load(Ordering::Relaxed));
425        })
426        .detach();
427
428        assert_matches!(stream.next().await.unwrap().unwrap(), RealmRequest::CreateChild {
429            collection,
430            decl,
431            args,
432            responder
433        } => {
434            assert_eq!(COLLECTION_NAME.to_string(), collection.name);
435            assert_eq!(Some(CHILD_NAME.to_string()), decl.name);
436            assert_eq!(Some(OTA_COMPONENT_URL.to_string()), decl.url);
437            assert_eq!(Some(StartupMode::Lazy), decl.startup);
438            assert_eq!(CreateChildArgs::default(), args);
439            responder.send(Ok(())).unwrap();
440        });
441
442        fasync::Task::local(async move {
443            ota_manager2.stop().await.unwrap_err();
444        })
445        .detach();
446
447        assert_matches!(stream.next().await.unwrap().unwrap(), RealmRequest::DestroyChild {
448            child,
449            responder
450        } => {
451            assert_eq!(CHILD_NAME.to_string(), child.name);
452            assert_eq!(Some(COLLECTION_NAME.to_string()), child.collection);
453            responder.send(Ok(())).unwrap();
454        });
455    }
456
457    async fn handle_batch_iterator(
458        data: serde_json::Value,
459        result_stream: ServerEnd<BatchIteratorMarker>,
460    ) {
461        let mut stream = result_stream.into_stream();
462        while let Some(req) = stream.try_next().await.expect("stream request") {
463            match req {
464                BatchIteratorRequest::WaitForReady { responder } => {
465                    let _ = responder.send();
466                }
467                BatchIteratorRequest::GetNext { responder } => {
468                    let content = serde_json::to_string_pretty(&data).expect("json pretty");
469                    let vmo_size = content.len() as u64;
470                    let vmo = zx::Vmo::create(vmo_size).expect("create vmo");
471                    vmo.write(content.as_bytes(), 0).expect("write vmo");
472                    let buffer = fidl_fuchsia_mem::Buffer { vmo, size: vmo_size };
473                    responder
474                        .send(Ok(vec![FormattedContent::Json(buffer)]))
475                        .expect("send response");
476                    break;
477                }
478                BatchIteratorRequest::_UnknownMethod { .. } => {
479                    unreachable!("Unexpected method call");
480                }
481            }
482        }
483    }
484
485    fn spawn_fake_archive(
486        data_to_send: serde_json::Value,
487    ) -> (ArchiveAccessorProxy, impl Future<Output = ()>) {
488        let (proxy, mut stream) =
489            fidl::endpoints::create_proxy_and_stream::<ArchiveAccessorMarker>();
490        let task = async move {
491            while let Some(request) = stream.try_next().await.expect("stream request") {
492                match request {
493                    ArchiveAccessorRequest::StreamDiagnostics { result_stream, .. } => {
494                        let data = data_to_send.clone();
495                        handle_batch_iterator(data, result_stream).await;
496                        break;
497                    }
498                    ArchiveAccessorRequest::WaitForReady { responder, .. } => {
499                        let _ = responder.send();
500                    }
501                    ArchiveAccessorRequest::_UnknownMethod { .. } => {
502                        unreachable!("Unexpected method call");
503                    }
504                }
505            }
506        };
507        (proxy, task)
508    }
509
510    #[fuchsia::test]
511    async fn test_log_listener_listens() -> Result<(), Error> {
512        let lines = Arc::new(Mutex::new(Vec::new()));
513        let lines2 = lines.clone();
514        let expected_msg = "this is a test message".to_string();
515        let (log_proxy, archive_accessor_task) = spawn_fake_archive(
516            serde_json::from_str(
517                &serde_json::to_string(
518                    &LogsDataBuilder::new(BuilderArgs {
519                        component_url: None,
520                        moniker: diagnostics_data::ExtendedMoniker::ComponentManager,
521                        severity: diagnostics_data::Severity::Trace,
522                        timestamp: zx::BootInstant::ZERO,
523                    })
524                    .set_raw_severity(0)
525                    .add_tag(format!("{}:{}", COLLECTION_NAME, CHILD_NAME))
526                    .set_message(expected_msg.clone())
527                    .build(),
528                )
529                .unwrap(),
530            )
531            .unwrap(),
532        );
533
534        let listener = OtaLogListenerImpl::new_with_proxy(log_proxy);
535        let reader = fasync::Task::local(async move {
536            let lines = lines2.clone();
537            listener
538                .listen(Box::new(move |line| {
539                    let lines = lines.clone();
540                    futures::executor::block_on(async move {
541                        lines.lock().await.push(line);
542                    });
543                }))
544                .await
545                .unwrap();
546        });
547        archive_accessor_task.await;
548        reader.await;
549        assert_eq!(1, lines.lock().await.len());
550        assert!(lines.lock().await[0].ends_with(&expected_msg));
551        Ok(())
552    }
553}