1use bt_avdtp::{EndpointType, MediaStream};
6use dyn_clone::DynClone;
7use fidl_fuchsia_bluetooth_bredr::AudioOffloadExtProxy;
8use fuchsia_bluetooth::types::PeerId;
9use fuchsia_inspect::Node;
10use fuchsia_inspect_derive::AttachError;
11use futures::FutureExt;
12use futures::future::{BoxFuture, Shared};
13use std::time::Duration;
14use thiserror::Error;
15
16use crate::codec::MediaCodecConfig;
17
18#[derive(Debug, Error, Clone)]
19#[non_exhaustive]
20pub enum MediaTaskError {
21 #[error("Operation or configuration not supported")]
22 NotSupported,
23 #[error("Peer closed the media stream")]
24 PeerClosed,
25 #[error("Resources needed are already being used")]
26 ResourcesInUse,
27 #[error("Other Media Task Error: {}", _0)]
28 Other(String),
29}
30
31impl From<bt_avdtp::Error> for MediaTaskError {
32 fn from(error: bt_avdtp::Error) -> Self {
33 Self::Other(format!("AVDTP Error: {}", error))
34 }
35}
36
37pub trait MediaTaskBuilder: Send + Sync + DynClone {
44 fn configure(
48 &self,
49 peer_id: &PeerId,
50 codec_config: &MediaCodecConfig,
51 ) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError>;
52
53 fn direction(&self) -> EndpointType;
57
58 fn supported_configs(
65 &self,
66 peer_id: &PeerId,
67 offload: Option<AudioOffloadExtProxy>,
68 ) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>>;
69}
70
71dyn_clone::clone_trait_object!(MediaTaskBuilder);
72
73pub trait MediaTaskRunner: Send {
78 fn start(
84 &mut self,
85 stream: MediaStream,
86 offload: Option<AudioOffloadExtProxy>,
87 ) -> Result<Box<dyn MediaTask>, MediaTaskError>;
88
89 fn reconfigure(&mut self, _config: &MediaCodecConfig) -> Result<(), MediaTaskError> {
93 Err(MediaTaskError::NotSupported)
94 }
95
96 fn set_delay(&mut self, _delay: Duration) -> Result<(), MediaTaskError> {
102 Err(MediaTaskError::NotSupported)
103 }
104
105 fn iattach(&mut self, _parent: &Node, _name: &str) -> Result<(), AttachError> {
108 Err("attach not implemented".into())
109 }
110}
111
112pub trait MediaTask: Send {
117 fn finished(&mut self) -> BoxFuture<'static, Result<(), MediaTaskError>>;
120
121 fn result(&mut self) -> Option<Result<(), MediaTaskError>> {
123 self.finished().now_or_never()
124 }
125
126 fn stop(&mut self) -> Result<(), MediaTaskError>;
131}
132
133pub mod tests {
134 use super::*;
135
136 use fuchsia_sync::Mutex;
137 use futures::channel::{mpsc, oneshot};
138 use futures::stream::StreamExt;
139 use futures::{Future, TryFutureExt};
140 use std::fmt;
141 use std::sync::Arc;
142
143 #[derive(Clone)]
144 pub struct TestMediaTask {
145 pub peer_id: PeerId,
147 pub codec_config: MediaCodecConfig,
149 pub stream: Arc<Mutex<Option<MediaStream>>>,
151 sender: Arc<Mutex<Option<oneshot::Sender<Result<(), MediaTaskError>>>>>,
153 result: Shared<BoxFuture<'static, Result<(), MediaTaskError>>>,
155 pub delay: Duration,
157 }
158
159 impl fmt::Debug for TestMediaTask {
160 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161 f.debug_struct("TestMediaTask")
162 .field("peer_id", &self.peer_id)
163 .field("codec_config", &self.codec_config)
164 .field("result", &self.result.clone().now_or_never())
165 .finish()
166 }
167 }
168
169 impl TestMediaTask {
170 pub fn new(
171 peer_id: PeerId,
172 codec_config: MediaCodecConfig,
173 stream: MediaStream,
174 delay: Duration,
175 ) -> Self {
176 let (sender, receiver) = oneshot::channel();
177 let result = receiver
178 .map_ok_or_else(
179 |_err| Err(MediaTaskError::Other(format!("Nothing sent"))),
180 |result| result,
181 )
182 .boxed()
183 .shared();
184 Self {
185 peer_id,
186 codec_config,
187 stream: Arc::new(Mutex::new(Some(stream))),
188 sender: Arc::new(Mutex::new(Some(sender))),
189 result,
190 delay,
191 }
192 }
193
194 pub fn is_started(&self) -> bool {
196 self.stream.lock().is_some()
198 }
199
200 pub fn end_prematurely(&self, task_result: Option<Result<(), MediaTaskError>>) {
203 let _removed_stream = self.stream.lock().take();
204 let mut lock = self.sender.lock();
205 let sender = lock.take();
206 if let (Some(result), Some(sender)) = (task_result, sender) {
207 sender.send(result).expect("send ok");
208 }
209 }
210 }
211
212 impl MediaTask for TestMediaTask {
213 fn finished(&mut self) -> BoxFuture<'static, Result<(), MediaTaskError>> {
214 self.result.clone().boxed()
215 }
216
217 fn stop(&mut self) -> Result<(), MediaTaskError> {
218 let _ = self.stream.lock().take();
219 {
220 let mut lock = self.sender.lock();
221 if let Some(sender) = lock.take() {
222 let _ = sender.send(Ok(()));
223 return Ok(());
224 }
225 }
226 self.finished().now_or_never().unwrap()
228 }
229 }
230
231 pub struct TestMediaTaskRunner {
232 pub peer_id: PeerId,
234 pub codec_config: MediaCodecConfig,
236 pub reconfigurable: bool,
238 pub supports_set_delay: bool,
240 pub set_delay: Option<std::time::Duration>,
242 pub sender: mpsc::Sender<TestMediaTask>,
244 }
245
246 impl MediaTaskRunner for TestMediaTaskRunner {
247 fn start(
248 &mut self,
249 stream: MediaStream,
250 _offload: Option<AudioOffloadExtProxy>,
251 ) -> Result<Box<dyn MediaTask>, MediaTaskError> {
252 let task = TestMediaTask::new(
253 self.peer_id.clone(),
254 self.codec_config.clone(),
255 stream,
256 self.set_delay.unwrap_or(Duration::ZERO),
257 );
258 let _ = self.sender.try_send(task.clone());
260 Ok(Box::new(task))
261 }
262
263 fn set_delay(&mut self, delay: std::time::Duration) -> Result<(), MediaTaskError> {
264 if self.supports_set_delay {
265 self.set_delay = Some(delay);
266 Ok(())
267 } else {
268 Err(MediaTaskError::NotSupported)
269 }
270 }
271
272 fn reconfigure(&mut self, config: &MediaCodecConfig) -> Result<(), MediaTaskError> {
273 if self.reconfigurable {
274 self.codec_config = config.clone();
275 Ok(())
276 } else {
277 Err(MediaTaskError::NotSupported)
278 }
279 }
280 }
281
282 pub struct TestMediaTaskBuilder {
286 sender: Mutex<mpsc::Sender<TestMediaTask>>,
287 receiver: mpsc::Receiver<TestMediaTask>,
288 reconfigurable: bool,
289 supports_set_delay: bool,
290 configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
291 direction: EndpointType,
292 }
293
294 impl TestMediaTaskBuilder {
295 pub fn new() -> Self {
296 let (sender, receiver) = mpsc::channel(5);
297 Self {
298 sender: Mutex::new(sender),
299 receiver,
300 reconfigurable: false,
301 supports_set_delay: false,
302 configs: Ok(vec![crate::codec::MediaCodecConfig::min_sbc()]),
303 direction: EndpointType::Sink,
304 }
305 }
306
307 pub fn with_configs(
308 &mut self,
309 configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
310 ) -> &mut Self {
311 self.configs = configs;
312 self
313 }
314
315 pub fn with_direction(&mut self, direction: EndpointType) -> &mut Self {
316 self.direction = direction;
317 self
318 }
319
320 pub fn new_reconfigurable() -> Self {
321 Self { reconfigurable: true, ..Self::new() }
322 }
323
324 pub fn new_delayable() -> Self {
325 Self { supports_set_delay: true, ..Self::new() }
326 }
327
328 pub fn builder(&self) -> Box<dyn MediaTaskBuilder> {
331 Box::new(TestMediaTaskBuilderBuilder {
332 sender: self.sender.lock().clone(),
333 reconfigurable: self.reconfigurable,
334 supports_set_delay: self.supports_set_delay,
335 configs: self.configs.clone(),
336 direction: self.direction,
337 })
338 }
339
340 pub fn next_task(&mut self) -> impl Future<Output = Option<TestMediaTask>> + '_ {
344 self.receiver.next()
345 }
346
347 #[track_caller]
349 pub fn expect_task(&mut self) -> TestMediaTask {
350 self.receiver
351 .try_next()
352 .expect("should have made a task")
353 .expect("shouldn't have dropped all senders")
354 }
355 }
356
357 #[derive(Clone)]
358 struct TestMediaTaskBuilderBuilder {
359 sender: mpsc::Sender<TestMediaTask>,
360 reconfigurable: bool,
361 supports_set_delay: bool,
362 configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
363 direction: EndpointType,
364 }
365
366 impl MediaTaskBuilder for TestMediaTaskBuilderBuilder {
367 fn configure(
368 &self,
369 peer_id: &PeerId,
370 codec_config: &MediaCodecConfig,
371 ) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError> {
372 let runner = TestMediaTaskRunner {
373 peer_id: peer_id.clone(),
374 codec_config: codec_config.clone(),
375 sender: self.sender.clone(),
376 reconfigurable: self.reconfigurable,
377 supports_set_delay: self.supports_set_delay,
378 set_delay: None,
379 };
380 Ok::<Box<dyn MediaTaskRunner>, _>(Box::new(runner))
381 }
382
383 fn direction(&self) -> EndpointType {
384 self.direction
385 }
386
387 fn supported_configs(
388 &self,
389 _peer_id: &PeerId,
390 _offload: Option<AudioOffloadExtProxy>,
391 ) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>> {
392 futures::future::ready(self.configs.clone()).boxed()
393 }
394 }
395}