1use super::audio_fidl_handler::{Publisher, Publisher2};
6use crate::audio_default_settings::AudioInfoLoader;
7use crate::types::{
8 AUDIO_STREAM_TYPE_COUNT, AudioError, AudioInfo, AudioStream, AudioStreamType, SetAudioStream,
9};
10use crate::{ModifiedCounters, StreamVolumeControl, create_default_modified_counters};
11use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
12
13use futures::StreamExt;
14use futures::channel::oneshot::Sender;
15use settings_common::inspect::event::{ExternalEventPublisher, SettingValuePublisher};
16use settings_common::service_context::ServiceContext;
17use settings_common::{trace, trace_guard};
18use settings_storage::device_storage::{DeviceStorage, DeviceStorageCompatible};
19use settings_storage::storage_factory::{DefaultLoader, StorageAccess, StorageFactory};
20use std::collections::HashMap;
21use std::rc::Rc;
22use {fuchsia_async as fasync, fuchsia_trace as ftrace};
23
24pub enum Request {
25 Get(ftrace::Id, Sender<AudioInfo>),
26 Listen(UnboundedSender<AudioInfo>),
27 Set(Vec<SetAudioStream>, ftrace::Id, Sender<Result<(), AudioError>>),
28}
29
30struct Restart;
31
32impl StorageAccess for AudioController {
33 type Storage = DeviceStorage;
34 type Data = AudioInfo;
35 const STORAGE_KEY: &'static str = AudioInfo::KEY;
36}
37
38pub struct AudioController {
39 service_context: Rc<ServiceContext>,
40 store: Rc<DeviceStorage>,
41 audio_service_connected: bool,
42 stream_volume_controls: HashMap<AudioStreamType, StreamVolumeControl>,
43 modified_counters: ModifiedCounters,
44 audio_info_loader: AudioInfoLoader,
45 publisher: Option<Publisher>,
46 publisher2: Option<Publisher2>,
47 listeners: Vec<UnboundedSender<AudioInfo>>,
48 setting_value_publisher: SettingValuePublisher<AudioInfo>,
49 external_publisher: ExternalEventPublisher,
50 restart_tx: UnboundedSender<Restart>,
51 restart_rx: Option<UnboundedReceiver<Restart>>,
52}
53
54impl AudioController {
55 pub(crate) async fn new<F>(
56 service_context: Rc<ServiceContext>,
57 audio_info_loader: AudioInfoLoader,
58 storage_factory: Rc<F>,
59 setting_value_publisher: SettingValuePublisher<AudioInfo>,
60 external_publisher: ExternalEventPublisher,
61 ) -> AudioController
62 where
63 F: StorageFactory<Storage = DeviceStorage>,
64 {
65 let store = storage_factory.get_store().await;
66 let (restart_tx, restart_rx) = mpsc::unbounded();
67 Self {
68 service_context,
69 store,
70 stream_volume_controls: HashMap::new(),
71 audio_service_connected: false,
72 modified_counters: create_default_modified_counters(),
73 audio_info_loader,
74 publisher: None,
75 publisher2: None,
76 listeners: vec![],
77 setting_value_publisher,
78 external_publisher,
79 restart_tx,
80 restart_rx: Some(restart_rx),
81 }
82 }
83
84 pub(crate) async fn restore(&mut self) -> AudioInfo {
87 let id = ftrace::Id::new();
88 trace!(id, c"restore");
89 self.restore_volume_state(id, true).await
90 }
91
92 pub(crate) async fn restore_volume_state(
95 &mut self,
96 id: ftrace::Id,
97 push_to_audio_core: bool,
98 ) -> AudioInfo {
99 let audio_info = self.store.get::<AudioInfo>().await;
100
101 trace!(id, c"update volume streams from info");
102 let new_streams = audio_info.streams.iter();
103 let _guard = trace_guard!(id, c"check and bind");
104 if let Err(e) = self.update_streams(push_to_audio_core, new_streams, id).await {
105 log::error!("Failed to update streams: {e:?}");
106 }
107 audio_info
108 }
109
110 pub(crate) async fn get_info(&self) -> AudioInfo {
111 let mut info = self.store.get::<AudioInfo>().await;
112 info.modified_counters = Some(self.modified_counters.clone());
113 info
114 }
115
116 pub(crate) fn register_publishers(&mut self, publisher: Publisher, publisher2: Publisher2) {
117 self.publisher = Some(publisher);
118 self.publisher2 = Some(publisher2);
119 }
120
121 fn register_listener(&mut self, tx: UnboundedSender<AudioInfo>) {
122 self.listeners.push(tx);
123 }
124
125 fn publish(&self, new_info: AudioInfo) {
126 let _ = self.setting_value_publisher.publish(&new_info);
127 for listener in &self.listeners {
129 let _ = listener.unbounded_send(new_info.clone());
130 }
131 if let Some(publisher) = self.publisher.as_ref() {
133 publisher.update(|info| {
134 let info = info.as_mut().unwrap();
136 let mut old_streams = info.streams.iter();
137 let new_streams = new_info.streams.iter();
138 for new_stream in new_streams {
139 let old_stream = old_streams
140 .find(|stream| stream.stream_type == new_stream.stream_type)
141 .expect("stream type should be found in existing streams");
142 if (old_stream != new_stream) && new_stream.stream_type.is_legacy() {
144 *info = new_info.clone();
145 return true;
146 }
147 }
148 false
149 });
150 }
151 if let Some(publisher2) = self.publisher2.as_ref() {
152 publisher2.update(|info| {
153 let info = info.as_mut().unwrap();
155 let mut old_streams = info.streams.iter();
156 let new_streams = new_info.streams.iter();
157 for new_stream in new_streams {
158 let old_stream = old_streams
159 .find(|stream| stream.stream_type == new_stream.stream_type)
160 .expect("stream type should be found in existing streams");
161 if old_stream != new_stream {
163 *info = new_info.clone();
164 return true;
165 }
166 }
167 false
168 });
169 }
170 }
171
172 async fn set_volume(
173 &mut self,
174 volume: Vec<SetAudioStream>,
175 id: ftrace::Id,
176 ) -> Result<AudioInfo, AudioError> {
177 let guard = trace_guard!(id, c"set volume updating counters");
178 for stream in &volume {
180 let counter = self.modified_counters.entry(stream.stream_type).or_insert(0);
183 *counter = counter.wrapping_add(1);
184 }
185 drop(guard);
186
187 self.update_volume_streams_from_new_streams(volume, true, id).await
188 }
189
190 async fn get_streams_array_from_map(
191 &self,
192 stream_map: &HashMap<AudioStreamType, StreamVolumeControl>,
193 ) -> [AudioStream; AUDIO_STREAM_TYPE_COUNT] {
194 let mut streams: [AudioStream; AUDIO_STREAM_TYPE_COUNT] =
195 self.audio_info_loader.default_value().streams;
196 for stream in &mut streams {
197 if let Some(volume_control) = stream_map.get(&stream.stream_type) {
198 *stream = volume_control.stored_stream;
199 }
200 }
201
202 streams
203 }
204
205 async fn update_streams(
206 &mut self,
207 push_to_audio_core: bool,
208 new_streams: impl Iterator<Item = &AudioStream>,
209 id: ftrace::Id,
210 ) -> Result<(), AudioError> {
211 if push_to_audio_core {
212 let guard = trace_guard!(id, c"push to core");
213 self.check_and_bind_volume_controls(
214 id,
215 self.audio_info_loader.default_value().streams.iter(),
216 )
217 .await?;
218 drop(guard);
219
220 trace!(id, c"setting core");
221 for stream in new_streams {
222 if let Some(volume_control) =
223 self.stream_volume_controls.get_mut(&stream.stream_type)
224 {
225 volume_control.set_volume(id, *stream).await?;
226 }
227 }
228 } else {
229 trace!(id, c"without push to core");
230 self.check_and_bind_volume_controls(id, new_streams).await?;
231 }
232
233 Ok(())
234 }
235
236 async fn update_volume_streams_from_new_streams(
237 &mut self,
238 streams: Vec<SetAudioStream>,
239 push_to_audio_core: bool,
240 id: ftrace::Id,
241 ) -> Result<AudioInfo, AudioError> {
242 let mut new_vec = vec![];
243 trace!(id, c"update volume streams from new streams");
244 let calculating_guard = trace_guard!(id, c"check and bind");
245 trace!(id, c"reading setting");
246 let mut stored_value = self.store.get::<AudioInfo>().await;
247 for set_stream in streams.iter() {
248 let stored_stream = stored_value
249 .streams
250 .iter()
251 .find(|stream| stream.stream_type == set_stream.stream_type)
252 .ok_or_else(|| AudioError::InvalidArgument("stream", format!("{set_stream:?}")))?;
253 new_vec.push(AudioStream {
254 stream_type: stored_stream.stream_type,
255 source: set_stream.source,
256 user_volume_level: set_stream
257 .user_volume_level
258 .unwrap_or(stored_stream.user_volume_level),
259 user_volume_muted: set_stream
260 .user_volume_muted
261 .unwrap_or(stored_stream.user_volume_muted),
262 });
263 }
264 let new_streams = new_vec.iter();
265
266 self.update_streams(push_to_audio_core, new_streams, id).await?;
267 drop(calculating_guard);
268
269 let guard = trace_guard!(id, c"updating streams and counters");
270 stored_value.streams = self.get_streams_array_from_map(&self.stream_volume_controls).await;
271 stored_value.modified_counters = Some(self.modified_counters.clone());
272 drop(guard);
273
274 let guard = trace_guard!(id, c"writing setting");
275 let write_result = self.store.write(&stored_value).await;
276 drop(guard);
277 write_result.map(|_| stored_value).map_err(AudioError::WriteFailure)
279 }
280
281 async fn check_and_bind_volume_controls(
283 &mut self,
284 id: ftrace::Id,
285 streams: impl Iterator<Item = &AudioStream>,
286 ) -> Result<(), AudioError> {
287 trace!(id, c"check and bind fn");
288 if self.audio_service_connected {
289 return Ok(());
290 }
291
292 let guard = trace_guard!(id, c"connecting to service");
293 let service_result = self
294 .service_context
295 .connect_with_publisher::<fidl_fuchsia_media::AudioCoreMarker, _>(
296 self.external_publisher.clone(),
297 )
298 .await;
299
300 let audio_service = service_result.map_err(|e| {
301 AudioError::ExternalFailure(
302 "fuchsia.media.audio",
303 "connect for audio_core".into(),
304 format!("{e:?}"),
305 )
306 })?;
307
308 drop(guard);
312 let mut stream_tuples = Vec::new();
313 for stream in streams {
314 trace!(id, c"create stream volume control");
315 let restart_tx = self.restart_tx.clone();
316
317 stream_tuples.push((
319 stream.stream_type,
320 StreamVolumeControl::create(
321 id,
322 audio_service.clone(),
323 *stream,
324 Some(Rc::new(move || {
325 if let Err(e) = restart_tx.unbounded_send(Restart) {
326 log::error!("Failed to send restart signal: {e:?}");
327 }
328 })),
329 #[cfg(test)]
330 None,
331 )
332 .await?,
333 ));
334 }
335
336 stream_tuples.into_iter().for_each(|(stream_type, stream_volume_control)| {
337 let _ = self.stream_volume_controls.insert(stream_type, stream_volume_control);
339 });
340 self.audio_service_connected = true;
341
342 Ok(())
343 }
344
345 pub(crate) async fn handle(
346 mut self,
347 mut request_rx: UnboundedReceiver<Request>,
348 ) -> fasync::Task<()> {
349 let mut restart_rx: UnboundedReceiver<Restart> = self.restart_rx.take().unwrap();
350 fasync::Task::local(async move {
351 let mut next_request = request_rx.next();
352 let mut next_restart = restart_rx.next();
353 loop {
354 futures::select! {
355 request = next_request => {
356 if let Some(request) = request {
357 self.handle_request(request).await;
358 next_request = request_rx.next();
359 }
360 }
361 restart = next_restart => {
362 if restart.is_some() {
363 self.handle_restart().await;
364 next_restart = restart_rx.next();
365 }
366 }
367 }
368 }
369 })
370 }
371
372 async fn handle_request(&mut self, request: Request) {
373 match request {
374 Request::Get(id, tx) => {
375 trace!(id, c"controller get");
376 let res = self.get_info().await;
377 let _ = tx.send(res);
378 }
379 Request::Listen(tx) => {
380 self.register_listener(tx);
381 }
382 Request::Set(streams, id, tx) => {
383 trace!(id, c"controller set");
384 for audio_stream in &streams {
386 if !audio_stream.has_valid_volume_level() {
387 let _ = tx.send(Err(AudioError::InvalidArgument(
388 "stream",
389 format!("{audio_stream:?}"),
390 )));
391 return;
392 }
393 }
394 let res = self.set_volume(streams, id).await.map(|mut info| {
395 info.modified_counters = Some(self.modified_counters.clone());
396 self.publish(info)
397 });
398 let _ = tx.send(res);
399 }
400 }
401 }
402
403 async fn handle_restart(&mut self) {
404 let id = ftrace::Id::new();
405 trace!(id, c"restart");
406 self.audio_service_connected = false;
407 self.stream_volume_controls.clear();
408 let _ = self.restore_volume_state(id, false).await;
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use super::*;
415 use crate::build_audio_default_settings;
416 use crate::test_fakes::audio_core_service::{self, AudioCoreService};
417 use crate::types::AudioSettingSource;
418 use assert_matches::assert_matches;
419 use fidl_fuchsia_media::AudioRenderUsage2;
420 use fuchsia_inspect::component;
421 use futures::lock::Mutex;
422 use settings_common::config::default_settings::DefaultSetting;
423 use settings_common::inspect::config_logger::InspectConfigLogger;
424 use settings_test_common::fakes::service::ServiceRegistry;
425 use settings_test_common::storage::InMemoryStorageFactory;
426
427 const CHANGED_VOLUME_LEVEL: f32 = 0.7;
428 const CHANGED_VOLUME_MUTED: bool = true;
429
430 fn changed_media_audio_stream() -> SetAudioStream {
431 SetAudioStream {
432 stream_type: AudioStreamType::Media,
433 source: AudioSettingSource::User,
434 user_volume_level: Some(CHANGED_VOLUME_LEVEL),
435 user_volume_muted: Some(CHANGED_VOLUME_MUTED),
436 }
437 }
438
439 fn default_audio_info() -> DefaultSetting<AudioInfo, &'static str> {
440 let config_logger =
441 Rc::new(std::sync::Mutex::new(InspectConfigLogger::new(component::inspector().root())));
442 build_audio_default_settings(config_logger)
443 }
444
445 fn load_default_audio_info(
446 default_settings: &mut DefaultSetting<AudioInfo, &'static str>,
447 ) -> AudioInfo {
448 default_settings
449 .load_default_value()
450 .expect("config should exist and parse for test")
451 .unwrap()
452 }
453
454 struct FakeServices {
458 audio_core: Rc<Mutex<AudioCoreService>>,
459 }
460
461 fn get_default_stream(stream_type: AudioStreamType, info: AudioInfo) -> AudioStream {
462 info.streams.into_iter().find(|x| x.stream_type == stream_type).expect("contains stream")
463 }
464
465 fn verify_audio_info_stream(settings: &AudioInfo, stream: AudioStream) {
466 let _ = settings.streams.iter().find(|x| **x == stream).expect("contains stream");
467 }
468
469 async fn create_services(
471 default_settings: AudioInfo,
472 ) -> (Rc<Mutex<ServiceRegistry>>, FakeServices) {
473 let service_registry = ServiceRegistry::create();
474 let audio_core_service_handle = audio_core_service::Builder::new(default_settings).build();
475 service_registry.lock().await.register_service(audio_core_service_handle.clone());
476
477 (service_registry, FakeServices { audio_core: audio_core_service_handle })
478 }
479
480 async fn create_environment(
481 service_registry: Rc<Mutex<ServiceRegistry>>,
482 mut default_settings: DefaultSetting<AudioInfo, &'static str>,
483 ) -> (AudioController, Rc<DeviceStorage>) {
484 let storage_factory = Rc::new(InMemoryStorageFactory::with_initial_data(
485 &load_default_audio_info(&mut default_settings),
486 ));
487 let audio_controller = create_environment_from_storage(
488 service_registry,
489 Rc::clone(&storage_factory),
490 default_settings,
491 )
492 .await;
493 let store = storage_factory.get_device_storage().await;
494 (audio_controller, store)
495 }
496
497 async fn create_environment_from_storage(
498 service_registry: Rc<Mutex<ServiceRegistry>>,
499 storage_factory: Rc<InMemoryStorageFactory>,
500 default_settings: DefaultSetting<AudioInfo, &'static str>,
501 ) -> AudioController {
502 let audio_info_loader = AudioInfoLoader::new(default_settings);
503 storage_factory
504 .initialize_with_loader::<AudioController, _>(audio_info_loader.clone())
505 .await
506 .expect("initializing audio info storage");
507
508 let (tx, _) = mpsc::unbounded();
509 let setting_value_publisher = SettingValuePublisher::new(tx);
510 let (tx, _) = mpsc::unbounded();
511 let external_publisher = ExternalEventPublisher::new(tx);
512
513 AudioController::new(
514 Rc::new(ServiceContext::new(Some(Box::new(ServiceRegistry::serve(service_registry))))),
515 audio_info_loader,
516 storage_factory,
517 setting_value_publisher,
518 external_publisher,
519 )
520 .await
521 }
522
523 #[fuchsia::test(allow_stalls = false)]
525 async fn test_volume_restore() {
526 let mut default_settings = default_audio_info();
527 let mut stored_info = load_default_audio_info(&mut default_settings);
528 let (service_registry, fake_services) = create_services(stored_info.clone()).await;
529 let expected_info = (0.9, false);
530 for stream in stored_info.streams.iter_mut() {
531 if stream.stream_type == AudioStreamType::Media {
532 stream.user_volume_level = expected_info.0;
533 stream.user_volume_muted = expected_info.1;
534 log::info!("test");
535 }
536 }
537
538 let (tx, rx) = mpsc::unbounded();
539 fake_services.audio_core.lock().await.set_event_tx(tx);
540
541 let storage_factory = Rc::new(InMemoryStorageFactory::with_initial_data(&stored_info));
542 let mut audio_controller =
543 create_environment_from_storage(service_registry, storage_factory, default_settings)
544 .await;
545 let _ = audio_controller.restore().await;
546
547 let _ = rx.skip(AUDIO_STREAM_TYPE_COUNT - 1).next().await;
549
550 let stored_info = fake_services
551 .audio_core
552 .lock()
553 .await
554 .get_level_and_mute(AudioRenderUsage2::Media)
555 .unwrap();
556 assert_eq!(stored_info, expected_info);
557 }
558
559 #[fuchsia::test(allow_stalls = false)]
560 async fn test_persisted_values_applied_at_start() {
561 let mut default_settings = default_audio_info();
562 let (service_registry, fake_services) =
563 create_services(load_default_audio_info(&mut default_settings)).await;
564
565 let test_audio_info = AudioInfo {
566 streams: [
567 AudioStream {
568 stream_type: AudioStreamType::Background,
569 source: AudioSettingSource::User,
570 user_volume_level: 0.5,
571 user_volume_muted: true,
572 },
573 AudioStream {
574 stream_type: AudioStreamType::Media,
575 source: AudioSettingSource::User,
576 user_volume_level: 0.6,
577 user_volume_muted: true,
578 },
579 AudioStream {
580 stream_type: AudioStreamType::Interruption,
581 source: AudioSettingSource::System,
582 user_volume_level: 0.3,
583 user_volume_muted: false,
584 },
585 AudioStream {
586 stream_type: AudioStreamType::SystemAgent,
587 source: AudioSettingSource::User,
588 user_volume_level: 0.7,
589 user_volume_muted: true,
590 },
591 AudioStream {
592 stream_type: AudioStreamType::Communication,
593 source: AudioSettingSource::User,
594 user_volume_level: 0.8,
595 user_volume_muted: false,
596 },
597 AudioStream {
598 stream_type: AudioStreamType::Accessibility,
599 source: AudioSettingSource::User,
600 user_volume_level: 0.9,
601 user_volume_muted: false,
602 },
603 ],
604 modified_counters: Some(create_default_modified_counters()),
605 };
606
607 let (tx, rx) = mpsc::unbounded();
608 fake_services.audio_core.lock().await.set_event_tx(tx);
609
610 let storage_factory = Rc::new(InMemoryStorageFactory::with_initial_data(&test_audio_info));
611 let mut audio_controller =
612 create_environment_from_storage(service_registry, storage_factory, default_settings)
613 .await;
614 let info = audio_controller.restore().await;
615
616 let _ = rx.skip(AUDIO_STREAM_TYPE_COUNT * 2 - 1).next().await;
618
619 for stream in test_audio_info.streams.iter() {
621 verify_audio_info_stream(&info, *stream);
622 assert_eq!(
623 (stream.user_volume_level, stream.user_volume_muted),
624 fake_services
625 .audio_core
626 .lock()
627 .await
628 .get_level_and_mute(AudioRenderUsage2::from(stream.stream_type))
629 .unwrap()
630 );
631 }
632 }
633
634 #[fuchsia::test(allow_stalls = false)]
636 async fn test_get_without_audio_core() {
637 let mut default_settings = default_audio_info();
638 let default_info = load_default_audio_info(&mut default_settings);
639 let service_registry = ServiceRegistry::create();
640
641 let (mut controller, _) = create_environment(service_registry, default_settings).await;
642 let restore_info = controller.restore().await;
644 let get_info = controller.get_info().await;
645 assert_eq!(restore_info.streams, get_info.streams);
646 verify_audio_info_stream(
647 &get_info,
648 get_default_stream(AudioStreamType::Media, default_info),
649 );
650 }
651
652 #[fuchsia::test(allow_stalls = false)]
653 async fn test_invalid_stream_fails() {
654 let mut default_settings = default_audio_info();
655 let service_registry = ServiceRegistry::create();
658 let audio_core_service_handle =
659 audio_core_service::Builder::new(load_default_audio_info(&mut default_settings))
660 .set_suppress_client_errors(true)
661 .build();
662 service_registry.lock().await.register_service(audio_core_service_handle.clone());
663
664 let counters: HashMap<_, _> = [
667 (AudioStreamType::Background, 0),
668 (AudioStreamType::Interruption, 0),
669 (AudioStreamType::SystemAgent, 0),
670 (AudioStreamType::Communication, 0),
671 (AudioStreamType::Accessibility, 0),
672 ]
673 .into();
674
675 let test_audio_info = AudioInfo {
676 streams: [
677 AudioStream {
678 stream_type: AudioStreamType::Background,
679 source: AudioSettingSource::User,
680 user_volume_level: 0.5,
681 user_volume_muted: true,
682 },
683 AudioStream {
684 stream_type: AudioStreamType::Background,
685 source: AudioSettingSource::User,
686 user_volume_level: 0.5,
687 user_volume_muted: true,
688 },
689 AudioStream {
690 stream_type: AudioStreamType::Interruption,
691 source: AudioSettingSource::User,
692 user_volume_level: 0.5,
693 user_volume_muted: true,
694 },
695 AudioStream {
696 stream_type: AudioStreamType::SystemAgent,
697 source: AudioSettingSource::User,
698 user_volume_level: 0.5,
699 user_volume_muted: true,
700 },
701 AudioStream {
702 stream_type: AudioStreamType::Communication,
703 source: AudioSettingSource::User,
704 user_volume_level: 0.5,
705 user_volume_muted: true,
706 },
707 AudioStream {
708 stream_type: AudioStreamType::Accessibility,
709 source: AudioSettingSource::User,
710 user_volume_level: 0.5,
711 user_volume_muted: true,
712 },
713 ],
714 modified_counters: Some(counters),
715 };
716
717 let storage_factory = Rc::new(InMemoryStorageFactory::with_initial_data(&test_audio_info));
719 let mut audio_controller =
720 create_environment_from_storage(service_registry, storage_factory, default_settings)
721 .await;
722 let _ = audio_controller.restore().await;
723
724 let err = audio_controller
726 .set_volume(vec![changed_media_audio_stream()], fuchsia_trace::Id::new())
727 .await
728 .expect_err("set should fail");
729 assert_matches!(err, AudioError::InvalidArgument(..));
730 }
731}