1use crate::types::{AudioError, AudioStream};
6use crate::utils::round_volume_level;
7use fidl::endpoints::create_proxy;
8use fidl_fuchsia_media::Usage2;
9use fidl_fuchsia_media_audio::VolumeControlProxy;
10use futures::TryStreamExt;
11#[cfg(test)]
12use futures::channel::mpsc::UnboundedSender;
13use futures::channel::oneshot::Sender;
14#[cfg(test)]
15use settings_common::clock::mock as clock;
16use settings_common::inspect::event::ExternalEventPublisher;
17#[cfg(test)]
18use settings_common::service_context::ExternalServiceEvent;
19use settings_common::service_context::ExternalServiceProxy;
20use settings_common::{call, trace, trace_guard};
21use std::rc::Rc;
22use {fuchsia_async as fasync, fuchsia_trace as ftrace};
23
24#[cfg(test)]
25const PUBLISHER_EVENT_NAME: &str = "volume_control_events";
26const CONTROLLER_ERROR_DEPENDENCY: &str = "fuchsia.media.audio";
27#[cfg(test)]
28const UNKNOWN_INSPECT_STRING: &str = "unknown";
29
30pub(crate) type ExitAction = Rc<dyn Fn()>;
32
33pub struct StreamVolumeControl {
38 pub stored_stream: AudioStream,
39 proxy: Option<VolumeControlProxy>,
40 audio_service: ExternalServiceProxy<fidl_fuchsia_media::AudioCoreProxy, ExternalEventPublisher>,
41 early_exit_action: Option<ExitAction>,
42 #[cfg(test)]
43 publisher: Option<UnboundedSender<ExternalServiceEvent>>,
44 listen_exit_tx: Option<Sender<()>>,
45}
46
47impl Drop for StreamVolumeControl {
48 fn drop(&mut self) {
49 if let Some(exit_tx) = self.listen_exit_tx.take() {
50 if exit_tx.is_canceled() {
52 return;
53 }
54
55 exit_tx.send(()).unwrap_or_else(|_| {
58 log::warn!("StreamVolumeControl::drop, exit_tx failed to send exit signal")
59 });
60 }
61 }
62}
63
64impl StreamVolumeControl {
65 pub(crate) async fn create(
66 id: ftrace::Id,
67 audio_service: ExternalServiceProxy<
68 fidl_fuchsia_media::AudioCoreProxy,
69 ExternalEventPublisher,
70 >,
71 stream: AudioStream,
72 early_exit_action: Option<ExitAction>,
73 #[cfg(test)] publisher: Option<UnboundedSender<ExternalServiceEvent>>,
74 ) -> Result<Self, AudioError> {
75 assert!(stream.has_valid_volume_level());
78
79 trace!(id, c"StreamVolumeControl ctor");
80 let mut control = StreamVolumeControl {
81 stored_stream: stream,
82 proxy: None,
83 audio_service,
84 listen_exit_tx: None,
85 early_exit_action,
86 #[cfg(test)]
87 publisher,
88 };
89
90 control.bind_volume_control(id).await?;
91 Ok(control)
92 }
93
94 pub(crate) async fn set_volume(
95 &mut self,
96 id: ftrace::Id,
97 stream: AudioStream,
98 ) -> Result<(), AudioError> {
99 assert_eq!(self.stored_stream.stream_type, stream.stream_type);
100 assert!(stream.has_valid_volume_level());
103
104 if self.proxy.is_none() {
106 self.bind_volume_control(id).await?;
107 }
108
109 let mut new_stream_value = stream;
111 new_stream_value.user_volume_level = round_volume_level(stream.user_volume_level);
112
113 let proxy = self.proxy.as_ref().expect("no volume control proxy");
114
115 if (self.stored_stream.user_volume_level - new_stream_value.user_volume_level).abs()
116 > f32::EPSILON
117 && let Err(e) = proxy.set_volume(new_stream_value.user_volume_level)
118 {
119 self.stored_stream = new_stream_value;
120 return Err(AudioError::ExternalFailure(
121 CONTROLLER_ERROR_DEPENDENCY,
122 "set volume".into(),
123 format!("{e:?}"),
124 ));
125 }
126
127 if self.stored_stream.user_volume_muted != new_stream_value.user_volume_muted
128 && let Err(e) = proxy.set_mute(stream.user_volume_muted)
129 {
130 self.stored_stream = new_stream_value;
131 return Err(AudioError::ExternalFailure(
132 CONTROLLER_ERROR_DEPENDENCY,
133 "set mute".into(),
134 format!("{e:?}"),
135 ));
136 }
137
138 self.stored_stream = new_stream_value;
139 Ok(())
140 }
141
142 async fn bind_volume_control(&mut self, id: ftrace::Id) -> Result<(), AudioError> {
143 trace!(id, c"bind volume control");
144 if self.proxy.is_some() {
145 return Ok(());
146 }
147
148 let (vol_control_proxy, server_end) = create_proxy();
149 let stream_type = self.stored_stream.stream_type;
150 let usage = Usage2::RenderUsage(stream_type.into());
151
152 let guard = trace_guard!(id, c"bind usage volume control");
153 if let Err(e) = call!(self.audio_service => bind_usage_volume_control2(&usage, server_end))
154 {
155 return Err(AudioError::ExternalFailure(
156 CONTROLLER_ERROR_DEPENDENCY,
157 format!("bind_usage_volume_control2 for audio_core {usage:?}").into(),
158 format!("{e:?}"),
159 ));
160 }
161 drop(guard);
162
163 let guard = trace_guard!(id, c"set values");
164 if let Err(e) = vol_control_proxy.set_volume(self.stored_stream.user_volume_level) {
166 return Err(AudioError::ExternalFailure(
167 CONTROLLER_ERROR_DEPENDENCY,
168 format!("set_volume for vol_control {stream_type:?}").into(),
169 format!("{e:?}"),
170 ));
171 }
172
173 if let Err(e) = vol_control_proxy.set_mute(self.stored_stream.user_volume_muted) {
174 return Err(AudioError::ExternalFailure(
175 CONTROLLER_ERROR_DEPENDENCY,
176 "set_mute for vol_control".into(),
177 format!("{e:?}"),
178 ));
179 }
180 drop(guard);
181
182 if let Some(exit_tx) = self.listen_exit_tx.take() {
183 exit_tx.send(()).expect(
185 "StreamVolumeControl::bind_volume_control, listen_exit_tx failed to send exit \
186 signal",
187 );
188 }
189
190 trace!(id, c"setup listener");
191
192 let (exit_tx, mut exit_rx) = futures::channel::oneshot::channel::<()>();
193 let mut volume_events = vol_control_proxy.take_event_stream();
194 let early_exit_action = self.early_exit_action.clone();
195 fasync::Task::local({
196 #[cfg(test)]
197 let publisher = self.publisher.clone();
198 async move {
199 let id = ftrace::Id::new();
200 trace!(id, c"bind volume handler");
201 loop {
202 futures::select! {
203 _ = exit_rx => {
204 trace!(id, c"exit");
205 #[cfg(test)]
206 {
207 if let Some(publisher) = publisher {
210 let _ = publisher.unbounded_send(
211 ExternalServiceEvent::Closed(
212 PUBLISHER_EVENT_NAME,
213 UNKNOWN_INSPECT_STRING.into(),
214 UNKNOWN_INSPECT_STRING.into(),
215 clock::inspect_format_now().into(),
216 )
217 );
218 }
219 }
220 return;
221 }
222 volume_event = volume_events.try_next() => {
223 trace!(id, c"volume_event");
224 if let Err(_) | Ok(None) = volume_event {
225 if let Some(action) = early_exit_action {
226 (action)();
227 }
228 return;
229 }
230 }
231 }
232 }
233 }
234 })
235 .detach();
236
237 self.listen_exit_tx = Some(exit_tx);
238 self.proxy = Some(vol_control_proxy);
239 Ok(())
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use super::*;
246 use crate::test_fakes::audio_core_service;
247 use crate::types::{AudioInfo, AudioStreamType};
248 use crate::{StreamVolumeControl, build_audio_default_settings, create_default_audio_stream};
249 use fuchsia_inspect::component;
250 use futures::StreamExt;
251 use futures::channel::mpsc;
252 use futures::lock::Mutex;
253 use settings_common::inspect::config_logger::InspectConfigLogger;
254 use settings_common::service_context::ServiceContext;
255 use settings_test_common::fakes::service::ServiceRegistry;
256
257 fn default_audio_info() -> AudioInfo {
258 let config_logger =
259 Rc::new(std::sync::Mutex::new(InspectConfigLogger::new(component::inspector().root())));
260 let mut audio_configuration = build_audio_default_settings(config_logger);
261 audio_configuration
262 .load_default_value()
263 .expect("config should exist and parse for test")
264 .unwrap()
265 }
266
267 async fn create_service() -> Rc<Mutex<ServiceRegistry>> {
269 let service_registry = ServiceRegistry::create();
270 let audio_core_service_handle = audio_core_service::Builder::new(default_audio_info())
271 .set_suppress_client_errors(true)
272 .build();
273 service_registry.lock().await.register_service(audio_core_service_handle.clone());
274 service_registry
275 }
276
277 #[fuchsia::test(allow_stalls = false)]
279 async fn test_drop_thread() {
280 let service_context =
281 ServiceContext::new(Some(ServiceRegistry::serve(create_service().await)));
282 let (event_tx, _) = mpsc::unbounded();
283 let external_publisher = ExternalEventPublisher::new(event_tx);
284
285 let audio_proxy = service_context
286 .connect_with_publisher::<fidl_fuchsia_media::AudioCoreMarker, _>(external_publisher)
287 .await
288 .expect("service should be present");
289
290 let (event_tx, mut event_rx) = mpsc::unbounded();
291 let _ = StreamVolumeControl::create(
292 0.into(),
293 audio_proxy,
294 create_default_audio_stream(AudioStreamType::Media),
295 None,
296 Some(event_tx),
297 )
298 .await;
299 let req = "unknown";
300 let req_timestamp = "unknown";
301 let resp_timestamp = clock::inspect_format_now();
302
303 assert_eq!(
304 event_rx.next().await.expect("First message should have been the closed event"),
305 ExternalServiceEvent::Closed(
306 "volume_control_events",
307 req.into(),
308 req_timestamp.into(),
309 resp_timestamp.into(),
310 )
311 );
312 }
313
314 #[fuchsia::test(allow_stalls = false)]
317 async fn test_detect_early_exit() {
318 let service_registry = ServiceRegistry::create();
319 let audio_core_service_handle = audio_core_service::Builder::new(default_audio_info())
320 .set_suppress_client_errors(true)
321 .build();
322 service_registry.lock().await.register_service(audio_core_service_handle.clone());
323
324 let service_context = ServiceContext::new(Some(ServiceRegistry::serve(service_registry)));
325 let (event_tx, _) = mpsc::unbounded();
326 let external_publisher = ExternalEventPublisher::new(event_tx);
327
328 let audio_proxy = service_context
329 .connect_with_publisher::<fidl_fuchsia_media::AudioCoreMarker, _>(external_publisher)
330 .await
331 .expect("proxy should be present");
332 let (tx, mut rx) = futures::channel::mpsc::unbounded::<()>();
333
334 let _stream_volume_control = StreamVolumeControl::create(
339 0.into(),
340 audio_proxy,
341 create_default_audio_stream(AudioStreamType::Media),
342 Some(Rc::new(move || {
343 tx.unbounded_send(()).unwrap();
344 })),
345 None,
346 )
347 .await
348 .expect("should successfully build");
349
350 audio_core_service_handle.lock().await.exit();
352
353 assert!(matches!(rx.next().await, Some(..)));
355 }
356}