1use super::AudioInfoLoader;
6use super::audio_fidl_handler::{Publisher, Publisher2};
7use super::types::AudioError;
8use crate::audio::types::{
9 AUDIO_STREAM_TYPE_COUNT, AudioInfo, AudioStream, AudioStreamType, SetAudioStream,
10};
11use crate::audio::{ModifiedCounters, StreamVolumeControl, create_default_modified_counters};
12use futures::channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
13
14use futures::StreamExt;
15use futures::channel::oneshot::Sender;
16use settings_common::inspect::event::{ExternalEventPublisher, SettingValuePublisher};
17use settings_common::service_context::ServiceContext;
18use settings_common::{trace, trace_guard};
19use settings_storage::device_storage::{DeviceStorage, DeviceStorageCompatible};
20use settings_storage::storage_factory::{DefaultLoader, StorageAccess, StorageFactory};
21use std::collections::HashMap;
22use std::rc::Rc;
23use {fuchsia_async as fasync, fuchsia_trace as ftrace};
24
25pub enum Request {
26 Get(ftrace::Id, Sender<AudioInfo>),
27 Listen(UnboundedSender<AudioInfo>),
28 Set(Vec<SetAudioStream>, ftrace::Id, Sender<Result<(), AudioError>>),
29}
30
31struct Restart;
32
33impl StorageAccess for AudioController {
34 type Storage = DeviceStorage;
35 type Data = AudioInfo;
36 const STORAGE_KEY: &'static str = AudioInfo::KEY;
37}
38
39pub(crate) struct AudioController {
40 service_context: Rc<ServiceContext>,
41 store: Rc<DeviceStorage>,
42 audio_service_connected: bool,
43 stream_volume_controls: HashMap<AudioStreamType, StreamVolumeControl>,
44 modified_counters: ModifiedCounters,
45 audio_info_loader: AudioInfoLoader,
46 publisher: Option<Publisher>,
47 publisher2: Option<Publisher2>,
48 listeners: Vec<UnboundedSender<AudioInfo>>,
49 setting_value_publisher: SettingValuePublisher<AudioInfo>,
50 external_publisher: ExternalEventPublisher,
51 restart_tx: UnboundedSender<Restart>,
52 restart_rx: Option<UnboundedReceiver<Restart>>,
53}
54
55impl AudioController {
56 pub(crate) async fn new<F>(
57 service_context: Rc<ServiceContext>,
58 audio_info_loader: AudioInfoLoader,
59 storage_factory: Rc<F>,
60 setting_value_publisher: SettingValuePublisher<AudioInfo>,
61 external_publisher: ExternalEventPublisher,
62 ) -> AudioController
63 where
64 F: StorageFactory<Storage = DeviceStorage>,
65 {
66 let store = storage_factory.get_store().await;
67 let (restart_tx, restart_rx) = mpsc::unbounded();
68 Self {
69 service_context,
70 store,
71 stream_volume_controls: HashMap::new(),
72 audio_service_connected: false,
73 modified_counters: create_default_modified_counters(),
74 audio_info_loader,
75 publisher: None,
76 publisher2: None,
77 listeners: vec![],
78 setting_value_publisher,
79 external_publisher,
80 restart_tx,
81 restart_rx: Some(restart_rx),
82 }
83 }
84
85 pub(crate) async fn restore(&mut self) -> AudioInfo {
88 let id = ftrace::Id::new();
89 trace!(id, c"restore");
90 self.restore_volume_state(id, true).await
91 }
92
93 pub(crate) async fn restore_volume_state(
96 &mut self,
97 id: ftrace::Id,
98 push_to_audio_core: bool,
99 ) -> AudioInfo {
100 let audio_info = self.store.get::<AudioInfo>().await;
101
102 trace!(id, c"update volume streams from info");
103 let new_streams = audio_info.streams.iter();
104 let _guard = trace_guard!(id, c"check and bind");
105 if let Err(e) = self.update_streams(push_to_audio_core, new_streams, id).await {
106 log::error!("Failed to update streams: {e:?}");
107 }
108 audio_info
109 }
110
111 pub(crate) async fn get_info(&self) -> AudioInfo {
112 let mut info = self.store.get::<AudioInfo>().await;
113 info.modified_counters = Some(self.modified_counters.clone());
114 info
115 }
116
117 pub(crate) fn register_publishers(&mut self, publisher: Publisher, publisher2: Publisher2) {
118 self.publisher = Some(publisher);
119 self.publisher2 = Some(publisher2);
120 }
121
122 fn register_listener(&mut self, tx: UnboundedSender<AudioInfo>) {
123 self.listeners.push(tx);
124 }
125
126 fn publish(&self, new_info: AudioInfo) {
127 let _ = self.setting_value_publisher.publish(&new_info);
128 for listener in &self.listeners {
130 let _ = listener.unbounded_send(new_info.clone());
131 }
132 if let Some(publisher) = self.publisher.as_ref() {
134 publisher.update(|info| {
135 let info = info.as_mut().unwrap();
137 let mut old_streams = info.streams.iter();
138 let new_streams = new_info.streams.iter();
139 for new_stream in new_streams {
140 let old_stream = old_streams
141 .find(|stream| stream.stream_type == new_stream.stream_type)
142 .expect("stream type should be found in existing streams");
143 if (old_stream != new_stream) && new_stream.stream_type.is_legacy() {
145 *info = new_info.clone();
146 return true;
147 }
148 }
149 false
150 });
151 }
152 if let Some(publisher2) = self.publisher2.as_ref() {
153 publisher2.update(|info| {
154 let info = info.as_mut().unwrap();
156 let mut old_streams = info.streams.iter();
157 let new_streams = new_info.streams.iter();
158 for new_stream in new_streams {
159 let old_stream = old_streams
160 .find(|stream| stream.stream_type == new_stream.stream_type)
161 .expect("stream type should be found in existing streams");
162 if old_stream != new_stream {
164 *info = new_info.clone();
165 return true;
166 }
167 }
168 false
169 });
170 }
171 }
172
173 async fn set_volume(
174 &mut self,
175 volume: Vec<SetAudioStream>,
176 id: ftrace::Id,
177 ) -> Result<AudioInfo, AudioError> {
178 let guard = trace_guard!(id, c"set volume updating counters");
179 for stream in &volume {
181 let counter = self.modified_counters.entry(stream.stream_type).or_insert(0);
184 *counter = counter.wrapping_add(1);
185 }
186 drop(guard);
187
188 self.update_volume_streams_from_new_streams(volume, true, id).await
189 }
190
191 async fn get_streams_array_from_map(
192 &self,
193 stream_map: &HashMap<AudioStreamType, StreamVolumeControl>,
194 ) -> [AudioStream; AUDIO_STREAM_TYPE_COUNT] {
195 let mut streams: [AudioStream; AUDIO_STREAM_TYPE_COUNT] =
196 self.audio_info_loader.default_value().streams;
197 for stream in &mut streams {
198 if let Some(volume_control) = stream_map.get(&stream.stream_type) {
199 *stream = volume_control.stored_stream;
200 }
201 }
202
203 streams
204 }
205
206 async fn update_streams(
207 &mut self,
208 push_to_audio_core: bool,
209 new_streams: impl Iterator<Item = &AudioStream>,
210 id: ftrace::Id,
211 ) -> Result<(), AudioError> {
212 if push_to_audio_core {
213 let guard = trace_guard!(id, c"push to core");
214 self.check_and_bind_volume_controls(
215 id,
216 self.audio_info_loader.default_value().streams.iter(),
217 )
218 .await?;
219 drop(guard);
220
221 trace!(id, c"setting core");
222 for stream in new_streams {
223 if let Some(volume_control) =
224 self.stream_volume_controls.get_mut(&stream.stream_type)
225 {
226 let _ = volume_control.set_volume(id, *stream).await?;
227 }
228 }
229 } else {
230 trace!(id, c"without push to core");
231 self.check_and_bind_volume_controls(id, new_streams).await?;
232 }
233
234 Ok(())
235 }
236
237 async fn update_volume_streams_from_new_streams(
238 &mut self,
239 streams: Vec<SetAudioStream>,
240 push_to_audio_core: bool,
241 id: ftrace::Id,
242 ) -> Result<AudioInfo, AudioError> {
243 let mut new_vec = vec![];
244 trace!(id, c"update volume streams from new streams");
245 let calculating_guard = trace_guard!(id, c"check and bind");
246 trace!(id, c"reading setting");
247 let mut stored_value = self.store.get::<AudioInfo>().await;
248 for set_stream in streams.iter() {
249 let stored_stream = stored_value
250 .streams
251 .iter()
252 .find(|stream| stream.stream_type == set_stream.stream_type)
253 .ok_or_else(|| AudioError::InvalidArgument("stream", format!("{set_stream:?}")))?;
254 new_vec.push(AudioStream {
255 stream_type: stored_stream.stream_type,
256 source: set_stream.source,
257 user_volume_level: set_stream
258 .user_volume_level
259 .unwrap_or(stored_stream.user_volume_level),
260 user_volume_muted: set_stream
261 .user_volume_muted
262 .unwrap_or(stored_stream.user_volume_muted),
263 });
264 }
265 let new_streams = new_vec.iter();
266
267 self.update_streams(push_to_audio_core, new_streams, id).await?;
268 drop(calculating_guard);
269
270 let guard = trace_guard!(id, c"updating streams and counters");
271 stored_value.streams = self.get_streams_array_from_map(&self.stream_volume_controls).await;
272 stored_value.modified_counters = Some(self.modified_counters.clone());
273 drop(guard);
274
275 let guard = trace_guard!(id, c"writing setting");
276 let write_result = self.store.write(&stored_value).await;
277 drop(guard);
278 write_result.map(|_| stored_value).map_err(AudioError::WriteFailure)
280 }
281
282 async fn check_and_bind_volume_controls(
284 &mut self,
285 id: ftrace::Id,
286 streams: impl Iterator<Item = &AudioStream>,
287 ) -> Result<(), AudioError> {
288 trace!(id, c"check and bind fn");
289 if self.audio_service_connected {
290 return Ok(());
291 }
292
293 let guard = trace_guard!(id, c"connecting to service");
294 let service_result = self
295 .service_context
296 .connect_with_publisher::<fidl_fuchsia_media::AudioCoreMarker, _>(
297 self.external_publisher.clone(),
298 )
299 .await;
300
301 let audio_service = service_result.map_err(|e| {
302 AudioError::ExternalFailure(
303 "fuchsia.media.audio",
304 "connect for audio_core".into(),
305 format!("{e:?}"),
306 )
307 })?;
308
309 drop(guard);
313 let mut stream_tuples = Vec::new();
314 for stream in streams {
315 trace!(id, c"create stream volume control");
316 let restart_tx = self.restart_tx.clone();
317
318 stream_tuples.push((
320 stream.stream_type,
321 StreamVolumeControl::create(
322 id,
323 audio_service.clone(),
324 *stream,
325 Some(Rc::new(move || {
326 if let Err(e) = restart_tx.unbounded_send(Restart) {
327 log::error!("Failed to send restart signal: {e:?}");
328 }
329 })),
330 #[cfg(test)]
331 None,
332 )
333 .await?,
334 ));
335 }
336
337 stream_tuples.into_iter().for_each(|(stream_type, stream_volume_control)| {
338 let _ = self.stream_volume_controls.insert(stream_type, stream_volume_control);
340 });
341 self.audio_service_connected = true;
342
343 Ok(())
344 }
345
346 pub(crate) async fn handle(
347 mut self,
348 mut request_rx: UnboundedReceiver<Request>,
349 ) -> fasync::Task<()> {
350 let mut restart_rx: UnboundedReceiver<Restart> = self.restart_rx.take().unwrap();
351 fasync::Task::local(async move {
352 let mut next_request = request_rx.next();
353 let mut next_restart = restart_rx.next();
354 loop {
355 futures::select! {
356 request = next_request => {
357 if let Some(request) = request {
358 self.handle_request(request).await;
359 next_request = request_rx.next();
360 }
361 }
362 restart = next_restart => {
363 if let Some(_) = restart {
364 self.handle_restart().await;
365 next_restart = restart_rx.next();
366 }
367 }
368 }
369 }
370 })
371 }
372
373 async fn handle_request(&mut self, request: Request) {
374 match request {
375 Request::Get(id, tx) => {
376 trace!(id, c"controller get");
377 let res = self.get_info().await;
378 let _ = tx.send(res);
379 }
380 Request::Listen(tx) => {
381 self.register_listener(tx);
382 }
383 Request::Set(streams, id, tx) => {
384 trace!(id, c"controller set");
385 for audio_stream in &streams {
387 if !audio_stream.has_valid_volume_level() {
388 let _ = tx.send(Err(AudioError::InvalidArgument(
389 "stream",
390 format!("{audio_stream:?}"),
391 )));
392 return;
393 }
394 }
395 let res = self.set_volume(streams, id).await.map(|mut info| {
396 info.modified_counters = Some(self.modified_counters.clone());
397 self.publish(info)
398 });
399 let _ = tx.send(res);
400 }
401 }
402 }
403
404 async fn handle_restart(&mut self) {
405 let id = ftrace::Id::new();
406 trace!(id, c"restart");
407 self.audio_service_connected = false;
408 self.stream_volume_controls.clear();
409 let _ = self.restore_volume_state(id, false).await;
410 }
411}
412
413#[cfg(test)]
414mod tests {
415 use super::*;
416 use crate::audio::build_audio_default_settings;
417 use crate::audio::test_fakes::audio_core_service::{self, AudioCoreService};
418 use crate::audio::types::AudioSettingSource;
419 use assert_matches::assert_matches;
420 use fidl_fuchsia_media::AudioRenderUsage2;
421 use fuchsia_inspect::component;
422 use futures::lock::Mutex;
423 use settings_common::config::default_settings::DefaultSetting;
424 use settings_common::inspect::config_logger::InspectConfigLogger;
425 use settings_test_common::fakes::service::ServiceRegistry;
426 use settings_test_common::storage::InMemoryStorageFactory;
427
428 const CHANGED_VOLUME_LEVEL: f32 = 0.7;
429 const CHANGED_VOLUME_MUTED: bool = true;
430
431 fn changed_media_audio_stream() -> SetAudioStream {
432 SetAudioStream {
433 stream_type: AudioStreamType::Media,
434 source: AudioSettingSource::User,
435 user_volume_level: Some(CHANGED_VOLUME_LEVEL),
436 user_volume_muted: Some(CHANGED_VOLUME_MUTED),
437 }
438 }
439
440 fn default_audio_info() -> DefaultSetting<AudioInfo, &'static str> {
441 let config_logger =
442 Rc::new(std::sync::Mutex::new(InspectConfigLogger::new(component::inspector().root())));
443 build_audio_default_settings(config_logger)
444 }
445
446 fn load_default_audio_info(
447 default_settings: &mut DefaultSetting<AudioInfo, &'static str>,
448 ) -> AudioInfo {
449 default_settings
450 .load_default_value()
451 .expect("config should exist and parse for test")
452 .unwrap()
453 }
454
455 struct FakeServices {
459 audio_core: Rc<Mutex<AudioCoreService>>,
460 }
461
462 fn get_default_stream(stream_type: AudioStreamType, info: AudioInfo) -> AudioStream {
463 info.streams.into_iter().find(|x| x.stream_type == stream_type).expect("contains stream")
464 }
465
466 fn verify_audio_info_stream(settings: &AudioInfo, stream: AudioStream) {
467 let _ = settings.streams.iter().find(|x| **x == stream).expect("contains stream");
468 }
469
470 async fn create_services(
472 default_settings: AudioInfo,
473 ) -> (Rc<Mutex<ServiceRegistry>>, FakeServices) {
474 let service_registry = ServiceRegistry::create();
475 let audio_core_service_handle = audio_core_service::Builder::new(default_settings).build();
476 service_registry.lock().await.register_service(audio_core_service_handle.clone());
477
478 (service_registry, FakeServices { audio_core: audio_core_service_handle })
479 }
480
481 async fn create_environment(
482 service_registry: Rc<Mutex<ServiceRegistry>>,
483 mut default_settings: DefaultSetting<AudioInfo, &'static str>,
484 ) -> (AudioController, Rc<DeviceStorage>) {
485 let storage_factory = Rc::new(InMemoryStorageFactory::with_initial_data(
486 &load_default_audio_info(&mut default_settings),
487 ));
488 let audio_controller = create_environment_from_storage(
489 service_registry,
490 Rc::clone(&storage_factory),
491 default_settings,
492 )
493 .await;
494 let store = storage_factory.get_device_storage().await;
495 (audio_controller, store)
496 }
497
498 async fn create_environment_from_storage(
499 service_registry: Rc<Mutex<ServiceRegistry>>,
500 storage_factory: Rc<InMemoryStorageFactory>,
501 default_settings: DefaultSetting<AudioInfo, &'static str>,
502 ) -> AudioController {
503 let audio_info_loader = AudioInfoLoader::new(default_settings);
504 storage_factory
505 .initialize_with_loader::<AudioController, _>(audio_info_loader.clone())
506 .await
507 .expect("initializing audio info storage");
508
509 let (tx, _) = mpsc::unbounded();
510 let setting_value_publisher = SettingValuePublisher::new(tx);
511 let (tx, _) = mpsc::unbounded();
512 let external_publisher = ExternalEventPublisher::new(tx);
513
514 let audio_controller = AudioController::new(
515 Rc::new(ServiceContext::new(Some(Box::new(ServiceRegistry::serve(service_registry))))),
516 audio_info_loader,
517 storage_factory,
518 setting_value_publisher,
519 external_publisher,
520 )
521 .await;
522 audio_controller
523 }
524
525 #[fuchsia::test(allow_stalls = false)]
527 async fn test_volume_restore() {
528 let mut default_settings = default_audio_info();
529 let mut stored_info = load_default_audio_info(&mut default_settings);
530 let (service_registry, fake_services) = create_services(stored_info.clone()).await;
531 let expected_info = (0.9, false);
532 for stream in stored_info.streams.iter_mut() {
533 if stream.stream_type == AudioStreamType::Media {
534 stream.user_volume_level = expected_info.0;
535 stream.user_volume_muted = expected_info.1;
536 }
537 }
538
539 let (tx, rx) = mpsc::unbounded();
540 fake_services.audio_core.lock().await.set_event_tx(tx);
541
542 let storage_factory = Rc::new(InMemoryStorageFactory::with_initial_data(&stored_info));
543 let mut audio_controller =
544 create_environment_from_storage(service_registry, storage_factory, default_settings)
545 .await;
546 let _ = audio_controller.restore().await;
547
548 let _ = rx.skip(AUDIO_STREAM_TYPE_COUNT - 1).next().await;
550
551 let stored_info = fake_services
552 .audio_core
553 .lock()
554 .await
555 .get_level_and_mute(AudioRenderUsage2::Media)
556 .unwrap();
557 assert_eq!(stored_info, expected_info);
558 }
559
560 #[fuchsia::test(allow_stalls = false)]
561 async fn test_persisted_values_applied_at_start() {
562 let mut default_settings = default_audio_info();
563 let (service_registry, fake_services) =
564 create_services(load_default_audio_info(&mut default_settings)).await;
565
566 let test_audio_info = AudioInfo {
567 streams: [
568 AudioStream {
569 stream_type: AudioStreamType::Background,
570 source: AudioSettingSource::User,
571 user_volume_level: 0.5,
572 user_volume_muted: true,
573 },
574 AudioStream {
575 stream_type: AudioStreamType::Media,
576 source: AudioSettingSource::User,
577 user_volume_level: 0.6,
578 user_volume_muted: true,
579 },
580 AudioStream {
581 stream_type: AudioStreamType::Interruption,
582 source: AudioSettingSource::System,
583 user_volume_level: 0.3,
584 user_volume_muted: false,
585 },
586 AudioStream {
587 stream_type: AudioStreamType::SystemAgent,
588 source: AudioSettingSource::User,
589 user_volume_level: 0.7,
590 user_volume_muted: true,
591 },
592 AudioStream {
593 stream_type: AudioStreamType::Communication,
594 source: AudioSettingSource::User,
595 user_volume_level: 0.8,
596 user_volume_muted: false,
597 },
598 AudioStream {
599 stream_type: AudioStreamType::Accessibility,
600 source: AudioSettingSource::User,
601 user_volume_level: 0.9,
602 user_volume_muted: false,
603 },
604 ],
605 modified_counters: Some(create_default_modified_counters()),
606 };
607
608 let (tx, rx) = mpsc::unbounded();
609 fake_services.audio_core.lock().await.set_event_tx(tx);
610
611 let storage_factory = Rc::new(InMemoryStorageFactory::with_initial_data(&test_audio_info));
612 let mut audio_controller =
613 create_environment_from_storage(service_registry, storage_factory, default_settings)
614 .await;
615 let info = audio_controller.restore().await;
616
617 let _ = rx.skip(AUDIO_STREAM_TYPE_COUNT * 2 - 1).next().await;
619
620 for stream in test_audio_info.streams.iter() {
622 verify_audio_info_stream(&info, *stream);
623 assert_eq!(
624 (stream.user_volume_level, stream.user_volume_muted),
625 fake_services
626 .audio_core
627 .lock()
628 .await
629 .get_level_and_mute(AudioRenderUsage2::from(stream.stream_type))
630 .unwrap()
631 );
632 }
633 }
634
635 #[fuchsia::test(allow_stalls = false)]
637 async fn test_get_without_audio_core() {
638 let mut default_settings = default_audio_info();
639 let default_info = load_default_audio_info(&mut default_settings);
640 let service_registry = ServiceRegistry::create();
641
642 let (mut controller, _) = create_environment(service_registry, default_settings).await;
643 let restore_info = controller.restore().await;
645 let get_info = controller.get_info().await;
646 assert_eq!(restore_info.streams, get_info.streams);
647 verify_audio_info_stream(
648 &get_info,
649 get_default_stream(AudioStreamType::Media, default_info),
650 );
651 }
652
653 #[fuchsia::test(allow_stalls = false)]
654 async fn test_invalid_stream_fails() {
655 let mut default_settings = default_audio_info();
656 let service_registry = ServiceRegistry::create();
659 let audio_core_service_handle =
660 audio_core_service::Builder::new(load_default_audio_info(&mut default_settings))
661 .set_suppress_client_errors(true)
662 .build();
663 service_registry.lock().await.register_service(audio_core_service_handle.clone());
664
665 let counters: HashMap<_, _> = [
668 (AudioStreamType::Background, 0),
669 (AudioStreamType::Interruption, 0),
670 (AudioStreamType::SystemAgent, 0),
671 (AudioStreamType::Communication, 0),
672 (AudioStreamType::Accessibility, 0),
673 ]
674 .into();
675
676 let test_audio_info = AudioInfo {
677 streams: [
678 AudioStream {
679 stream_type: AudioStreamType::Background,
680 source: AudioSettingSource::User,
681 user_volume_level: 0.5,
682 user_volume_muted: true,
683 },
684 AudioStream {
685 stream_type: AudioStreamType::Background,
686 source: AudioSettingSource::User,
687 user_volume_level: 0.5,
688 user_volume_muted: true,
689 },
690 AudioStream {
691 stream_type: AudioStreamType::Interruption,
692 source: AudioSettingSource::User,
693 user_volume_level: 0.5,
694 user_volume_muted: true,
695 },
696 AudioStream {
697 stream_type: AudioStreamType::SystemAgent,
698 source: AudioSettingSource::User,
699 user_volume_level: 0.5,
700 user_volume_muted: true,
701 },
702 AudioStream {
703 stream_type: AudioStreamType::Communication,
704 source: AudioSettingSource::User,
705 user_volume_level: 0.5,
706 user_volume_muted: true,
707 },
708 AudioStream {
709 stream_type: AudioStreamType::Accessibility,
710 source: AudioSettingSource::User,
711 user_volume_level: 0.5,
712 user_volume_muted: true,
713 },
714 ],
715 modified_counters: Some(counters),
716 };
717
718 let storage_factory = Rc::new(InMemoryStorageFactory::with_initial_data(&test_audio_info));
720 let mut audio_controller =
721 create_environment_from_storage(service_registry, storage_factory, default_settings)
722 .await;
723 let _ = audio_controller.restore().await;
724
725 let err = audio_controller
727 .set_volume(vec![changed_media_audio_stream()], fuchsia_trace::Id::new())
728 .await
729 .expect_err("set should fail");
730 assert_matches!(err, AudioError::InvalidArgument(..));
731 }
732}