1use crate::audio::types::{AudioError, AudioStream};
6use crate::audio::utils::round_volume_level;
7#[cfg(test)]
8use crate::clock;
9use fidl::endpoints::create_proxy;
10use fidl_fuchsia_media::Usage2;
11use fidl_fuchsia_media_audio::VolumeControlProxy;
12use futures::TryStreamExt;
13#[cfg(test)]
14use futures::channel::mpsc::UnboundedSender;
15use futures::channel::oneshot::Sender;
16use settings_common::inspect::event::ExternalEventPublisher;
17#[cfg(test)]
18use settings_common::service_context::ExternalServiceEvent;
19use settings_common::service_context::ExternalServiceProxy;
20use settings_common::{call, trace, trace_guard};
21use std::rc::Rc;
22use {fuchsia_async as fasync, fuchsia_trace as ftrace};
23
24#[cfg(test)]
25const PUBLISHER_EVENT_NAME: &str = "volume_control_events";
26const CONTROLLER_ERROR_DEPENDENCY: &str = "fuchsia.media.audio";
27#[cfg(test)]
28const UNKNOWN_INSPECT_STRING: &str = "unknown";
29
30pub(crate) type ExitAction = Rc<dyn Fn()>;
32
33pub struct StreamVolumeControl {
38 pub stored_stream: AudioStream,
39 proxy: Option<VolumeControlProxy>,
40 audio_service: ExternalServiceProxy<fidl_fuchsia_media::AudioCoreProxy, ExternalEventPublisher>,
41 early_exit_action: Option<ExitAction>,
42 #[cfg(test)]
43 publisher: Option<UnboundedSender<ExternalServiceEvent>>,
44 listen_exit_tx: Option<Sender<()>>,
45}
46
47impl Drop for StreamVolumeControl {
48 fn drop(&mut self) {
49 if let Some(exit_tx) = self.listen_exit_tx.take() {
50 if exit_tx.is_canceled() {
52 return;
53 }
54
55 exit_tx.send(()).unwrap_or_else(|_| {
58 log::warn!("StreamVolumeControl::drop, exit_tx failed to send exit signal")
59 });
60 }
61 }
62}
63
64impl StreamVolumeControl {
65 pub(crate) async fn create(
66 id: ftrace::Id,
67 audio_service: ExternalServiceProxy<
68 fidl_fuchsia_media::AudioCoreProxy,
69 ExternalEventPublisher,
70 >,
71 stream: AudioStream,
72 early_exit_action: Option<ExitAction>,
73 #[cfg(test)] publisher: Option<UnboundedSender<ExternalServiceEvent>>,
74 ) -> Result<Self, AudioError> {
75 assert!(stream.has_valid_volume_level());
78
79 trace!(id, c"StreamVolumeControl ctor");
80 let mut control = StreamVolumeControl {
81 stored_stream: stream,
82 proxy: None,
83 audio_service: audio_service,
84 listen_exit_tx: None,
85 early_exit_action,
86 #[cfg(test)]
87 publisher,
88 };
89
90 control.bind_volume_control(id).await?;
91 Ok(control)
92 }
93
94 pub(crate) async fn set_volume(
95 &mut self,
96 id: ftrace::Id,
97 stream: AudioStream,
98 ) -> Result<(), AudioError> {
99 assert_eq!(self.stored_stream.stream_type, stream.stream_type);
100 assert!(stream.has_valid_volume_level());
103
104 if self.proxy.is_none() {
106 self.bind_volume_control(id).await?;
107 }
108
109 let mut new_stream_value = stream;
111 new_stream_value.user_volume_level = round_volume_level(stream.user_volume_level);
112
113 let proxy = self.proxy.as_ref().expect("no volume control proxy");
114
115 if (self.stored_stream.user_volume_level - new_stream_value.user_volume_level).abs()
116 > f32::EPSILON
117 {
118 log::info!("PAUL: Setting volume to {new_stream_value:?}");
119 if let Err(e) = proxy.set_volume(new_stream_value.user_volume_level) {
120 self.stored_stream = new_stream_value;
121 return Err(AudioError::ExternalFailure(
122 CONTROLLER_ERROR_DEPENDENCY,
123 "set volume".into(),
124 format!("{e:?}"),
125 ));
126 }
127 }
128
129 if self.stored_stream.user_volume_muted != new_stream_value.user_volume_muted {
130 if let Err(e) = proxy.set_mute(stream.user_volume_muted) {
131 self.stored_stream = new_stream_value;
132 return Err(AudioError::ExternalFailure(
133 CONTROLLER_ERROR_DEPENDENCY,
134 "set mute".into(),
135 format!("{e:?}"),
136 ));
137 }
138 }
139
140 self.stored_stream = new_stream_value;
141 Ok(())
142 }
143
144 async fn bind_volume_control(&mut self, id: ftrace::Id) -> Result<(), AudioError> {
145 trace!(id, c"bind volume control");
146 if self.proxy.is_some() {
147 return Ok(());
148 }
149
150 let (vol_control_proxy, server_end) = create_proxy();
151 let stream_type = self.stored_stream.stream_type;
152 let usage = Usage2::RenderUsage(stream_type.into());
153
154 let guard = trace_guard!(id, c"bind usage volume control");
155 if let Err(e) = call!(self.audio_service => bind_usage_volume_control2(&usage, server_end))
156 {
157 return Err(AudioError::ExternalFailure(
158 CONTROLLER_ERROR_DEPENDENCY,
159 format!("bind_usage_volume_control2 for audio_core {usage:?}").into(),
160 format!("{e:?}"),
161 ));
162 }
163 drop(guard);
164
165 let guard = trace_guard!(id, c"set values");
166 if let Err(e) = vol_control_proxy.set_volume(self.stored_stream.user_volume_level) {
168 return Err(AudioError::ExternalFailure(
169 CONTROLLER_ERROR_DEPENDENCY,
170 format!("set_volume for vol_control {stream_type:?}").into(),
171 format!("{e:?}"),
172 ));
173 }
174
175 if let Err(e) = vol_control_proxy.set_mute(self.stored_stream.user_volume_muted) {
176 return Err(AudioError::ExternalFailure(
177 CONTROLLER_ERROR_DEPENDENCY,
178 "set_mute for vol_control".into(),
179 format!("{e:?}"),
180 ));
181 }
182 drop(guard);
183
184 if let Some(exit_tx) = self.listen_exit_tx.take() {
185 exit_tx.send(()).expect(
187 "StreamVolumeControl::bind_volume_control, listen_exit_tx failed to send exit \
188 signal",
189 );
190 }
191
192 trace!(id, c"setup listener");
193
194 let (exit_tx, mut exit_rx) = futures::channel::oneshot::channel::<()>();
195 let mut volume_events = vol_control_proxy.take_event_stream();
196 let early_exit_action = self.early_exit_action.clone();
197 fasync::Task::local({
198 #[cfg(test)]
199 let publisher = self.publisher.clone();
200 async move {
201 let id = ftrace::Id::new();
202 trace!(id, c"bind volume handler");
203 loop {
204 futures::select! {
205 _ = exit_rx => {
206 trace!(id, c"exit");
207 #[cfg(test)]
208 {
209 if let Some(publisher) = publisher {
212 let _ = publisher.unbounded_send(
213 ExternalServiceEvent::Closed(
214 PUBLISHER_EVENT_NAME,
215 UNKNOWN_INSPECT_STRING.into(),
216 UNKNOWN_INSPECT_STRING.into(),
217 clock::inspect_format_now().into(),
218 )
219 );
220 }
221 }
222 return;
223 }
224 volume_event = volume_events.try_next() => {
225 trace!(id, c"volume_event");
226 if let Err(_) | Ok(None) = volume_event {
227 if let Some(action) = early_exit_action {
228 (action)();
229 }
230 return;
231 }
232 }
233 }
234 }
235 }
236 })
237 .detach();
238
239 self.listen_exit_tx = Some(exit_tx);
240 self.proxy = Some(vol_control_proxy);
241 Ok(())
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248 use crate::audio::test_fakes::audio_core_service;
249 use crate::audio::types::{AudioInfo, AudioStreamType};
250 use crate::audio::{
251 StreamVolumeControl, build_audio_default_settings, create_default_audio_stream,
252 };
253 use crate::clock;
254 use fuchsia_inspect::component;
255 use futures::StreamExt;
256 use futures::channel::mpsc;
257 use futures::lock::Mutex;
258 use settings_common::inspect::config_logger::InspectConfigLogger;
259 use settings_common::service_context::ServiceContext;
260 use settings_test_common::fakes::service::ServiceRegistry;
261
262 fn default_audio_info() -> AudioInfo {
263 let config_logger =
264 Rc::new(std::sync::Mutex::new(InspectConfigLogger::new(component::inspector().root())));
265 let mut audio_configuration = build_audio_default_settings(config_logger);
266 audio_configuration
267 .load_default_value()
268 .expect("config should exist and parse for test")
269 .unwrap()
270 }
271
272 async fn create_service() -> Rc<Mutex<ServiceRegistry>> {
274 let service_registry = ServiceRegistry::create();
275 let audio_core_service_handle = audio_core_service::Builder::new(default_audio_info())
276 .set_suppress_client_errors(true)
277 .build();
278 service_registry.lock().await.register_service(audio_core_service_handle.clone());
279 service_registry
280 }
281
282 #[fuchsia::test(allow_stalls = false)]
284 async fn test_drop_thread() {
285 let service_context =
286 ServiceContext::new(Some(ServiceRegistry::serve(create_service().await)));
287 let (event_tx, _) = mpsc::unbounded();
288 let external_publisher = ExternalEventPublisher::new(event_tx);
289
290 let audio_proxy = service_context
291 .connect_with_publisher::<fidl_fuchsia_media::AudioCoreMarker, _>(external_publisher)
292 .await
293 .expect("service should be present");
294
295 let (event_tx, mut event_rx) = mpsc::unbounded();
296 let _ = StreamVolumeControl::create(
297 0.into(),
298 audio_proxy,
299 create_default_audio_stream(AudioStreamType::Media),
300 None,
301 Some(event_tx),
302 )
303 .await;
304 let req = "unknown";
305 let req_timestamp = "unknown";
306 let resp_timestamp = clock::inspect_format_now();
307
308 assert_eq!(
309 event_rx.next().await.expect("First message should have been the closed event"),
310 ExternalServiceEvent::Closed(
311 "volume_control_events",
312 req.into(),
313 req_timestamp.into(),
314 resp_timestamp.into(),
315 )
316 );
317 }
318
319 #[fuchsia::test(allow_stalls = false)]
322 async fn test_detect_early_exit() {
323 let service_registry = ServiceRegistry::create();
324 let audio_core_service_handle = audio_core_service::Builder::new(default_audio_info())
325 .set_suppress_client_errors(true)
326 .build();
327 service_registry.lock().await.register_service(audio_core_service_handle.clone());
328
329 let service_context = ServiceContext::new(Some(ServiceRegistry::serve(service_registry)));
330 let (event_tx, _) = mpsc::unbounded();
331 let external_publisher = ExternalEventPublisher::new(event_tx);
332
333 let audio_proxy = service_context
334 .connect_with_publisher::<fidl_fuchsia_media::AudioCoreMarker, _>(external_publisher)
335 .await
336 .expect("proxy should be present");
337 let (tx, mut rx) = futures::channel::mpsc::unbounded::<()>();
338
339 let _stream_volume_control = StreamVolumeControl::create(
344 0.into(),
345 audio_proxy,
346 create_default_audio_stream(AudioStreamType::Media),
347 Some(Rc::new(move || {
348 tx.unbounded_send(()).unwrap();
349 })),
350 None,
351 )
352 .await
353 .expect("should successfully build");
354
355 audio_core_service_handle.lock().await.exit();
357
358 assert!(matches!(rx.next().await, Some(..)));
360 }
361}