bt_a2dp/
media_task.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use bt_avdtp::{EndpointType, MediaStream};
6use dyn_clone::DynClone;
7use fidl_fuchsia_bluetooth_bredr::AudioOffloadExtProxy;
8use fuchsia_bluetooth::types::PeerId;
9use fuchsia_inspect::Node;
10use fuchsia_inspect_derive::AttachError;
11use futures::FutureExt;
12use futures::future::{BoxFuture, Shared};
13use std::time::Duration;
14use thiserror::Error;
15
16use crate::codec::MediaCodecConfig;
17
18#[derive(Debug, Error, Clone)]
19#[non_exhaustive]
20pub enum MediaTaskError {
21    #[error("Operation or configuration not supported")]
22    NotSupported,
23    #[error("Peer closed the media stream")]
24    PeerClosed,
25    #[error("Resources needed are already being used")]
26    ResourcesInUse,
27    #[error("Other Media Task Error: {}", _0)]
28    Other(String),
29}
30
31impl From<bt_avdtp::Error> for MediaTaskError {
32    fn from(error: bt_avdtp::Error) -> Self {
33        Self::Other(format!("AVDTP Error: {}", error))
34    }
35}
36
37/// MediaTaskRunners are configured with information about the media codec when either peer in a
38/// conversation configures a stream endpoint.  When successfully configured, they can start
39/// MediaTasks by accepting a MediaStream, which will provide or consume media on that stream until
40/// dropped or stopped.
41///
42/// A builder that will make media task runners from requested configurations.
43pub trait MediaTaskBuilder: Send + Sync + DynClone {
44    /// Configure a new stream based on the given `codec_config` parameters.
45    /// Returns a MediaTaskRunner if the configuration is supported, an
46    /// MediaTaskError::NotSupported otherwise.
47    fn configure(
48        &self,
49        peer_id: &PeerId,
50        codec_config: &MediaCodecConfig,
51    ) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError>;
52
53    /// Return the direction of tasks created by this builder.
54    /// Source tasks provide local encoded audio to a peer.
55    /// Sink tasks consume encoded audio from a peer.
56    fn direction(&self) -> EndpointType;
57
58    /// Provide a set of encoded media configurations that this task can support.
59    /// This can vary based on current system capabilities, and should be checked before
60    /// communicating capabilities to each peer.
61    /// `offload` is a proxy to the offload capabilities of the controller for this peer.
62    /// Returns a future that resolves to the set of MediaCodecConfigs that this builder supports,
63    /// typically one config per MediaCodecType, or an error if building the configs failed.
64    fn supported_configs(
65        &self,
66        peer_id: &PeerId,
67        offload: Option<AudioOffloadExtProxy>,
68    ) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>>;
69}
70
71dyn_clone::clone_trait_object!(MediaTaskBuilder);
72
73/// MediaTaskRunners represent an ability of the media system to start streaming media.
74/// They are configured for a specific codec by `MediaTaskBuilder::configure`
75/// Typically a MediaTaskRunner can start multiple streams without needing to be reconfigured,
76/// although possibly not simultaneously.
77pub trait MediaTaskRunner: Send {
78    /// Start a MediaTask using the MediaStream given.
79    /// If the task started, returns a MediaTask which will finish if the stream ends or an
80    /// error occurs, and can be stopped using `MediaTask::stop` or by dropping the MediaTask.
81    /// This can fail with MediaTaskError::ResourcesInUse if a MediaTask cannot be started because
82    /// one is already running.
83    fn start(
84        &mut self,
85        stream: MediaStream,
86        offload: Option<AudioOffloadExtProxy>,
87    ) -> Result<Box<dyn MediaTask>, MediaTaskError>;
88
89    /// Try to reconfigure the MediaTask to accept a new configuration.  This differs from
90    /// `MediaTaskBuilder::configure` as it attempts to preserve the same configured session.
91    /// The runner remains configured with the initial configuration on an error.
92    fn reconfigure(&mut self, _config: &MediaCodecConfig) -> Result<(), MediaTaskError> {
93        Err(MediaTaskError::NotSupported)
94    }
95
96    /// Set the delay reported from the peer for this media task.
97    /// This should configure the media source or sink to attempt to compensate.
98    /// Typically this is zero for Sink tasks, but Source tasks can receive this info from the peer.
99    /// May only be supported before start.
100    /// If an Error is returned, the delay has not been set.
101    fn set_delay(&mut self, _delay: Duration) -> Result<(), MediaTaskError> {
102        Err(MediaTaskError::NotSupported)
103    }
104
105    /// Add information from the running media task to the inspect tree
106    /// (i.e. data transferred, jitter, etc)
107    fn iattach(&mut self, _parent: &Node, _name: &str) -> Result<(), AttachError> {
108        Err("attach not implemented".into())
109    }
110}
111
112/// MediaTasks represent a media stream being actively processed (sent or received from a peer).
113/// They are are created by `MediaTaskRunner::start`.
114/// Typically a MediaTask will run a background task that is active until dropped or
115/// `MediaTask::stop` is called.
116pub trait MediaTask: Send {
117    /// Returns a Future that finishes when the running media task finshes for any reason.
118    /// Should return a future that immediately resolves if this task is finished.
119    fn finished(&mut self) -> BoxFuture<'static, Result<(), MediaTaskError>>;
120
121    /// Returns the result if this task has finished, and None otherwise
122    fn result(&mut self) -> Option<Result<(), MediaTaskError>> {
123        self.finished().now_or_never()
124    }
125
126    /// Stops the task normally, signalling to all waiters Ok(()).
127    /// Returns the result sent to MediaTask::finished futures, which may be different from Ok(()).
128    /// When this function returns, is is good practice to ensure the MediaStream that started
129    /// this task is also dropped.
130    fn stop(&mut self) -> Result<(), MediaTaskError>;
131}
132
133pub mod tests {
134    use super::*;
135
136    use fuchsia_sync::Mutex;
137    use futures::channel::{mpsc, oneshot};
138    use futures::stream::StreamExt;
139    use futures::{Future, TryFutureExt};
140    use std::fmt;
141    use std::sync::Arc;
142
143    #[derive(Clone)]
144    pub struct TestMediaTask {
145        /// The PeerId that was used to make this Task
146        pub peer_id: PeerId,
147        /// The configuration used to make this task
148        pub codec_config: MediaCodecConfig,
149        /// If still started, this holds the MediaStream.
150        pub stream: Arc<Mutex<Option<MediaStream>>>,
151        /// Sender for the shared result future. None if already sent.
152        sender: Arc<Mutex<Option<oneshot::Sender<Result<(), MediaTaskError>>>>>,
153        /// Shared result future.
154        result: Shared<BoxFuture<'static, Result<(), MediaTaskError>>>,
155        /// Delay the task was started with.
156        pub delay: Duration,
157    }
158
159    impl fmt::Debug for TestMediaTask {
160        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161            f.debug_struct("TestMediaTask")
162                .field("peer_id", &self.peer_id)
163                .field("codec_config", &self.codec_config)
164                .field("result", &self.result.clone().now_or_never())
165                .finish()
166        }
167    }
168
169    impl TestMediaTask {
170        pub fn new(
171            peer_id: PeerId,
172            codec_config: MediaCodecConfig,
173            stream: MediaStream,
174            delay: Duration,
175        ) -> Self {
176            let (sender, receiver) = oneshot::channel();
177            let result = receiver
178                .map_ok_or_else(
179                    |_err| Err(MediaTaskError::Other(format!("Nothing sent"))),
180                    |result| result,
181                )
182                .boxed()
183                .shared();
184            Self {
185                peer_id,
186                codec_config,
187                stream: Arc::new(Mutex::new(Some(stream))),
188                sender: Arc::new(Mutex::new(Some(sender))),
189                result,
190                delay,
191            }
192        }
193
194        /// Return true if the background media task is running.
195        pub fn is_started(&self) -> bool {
196            // The stream being held represents the task running.
197            self.stream.lock().is_some()
198        }
199
200        /// End the streaming task without an external stop().
201        /// Sends an optional result from the task.
202        pub fn end_prematurely(&self, task_result: Option<Result<(), MediaTaskError>>) {
203            let _removed_stream = self.stream.lock().take();
204            let mut lock = self.sender.lock();
205            let sender = lock.take();
206            if let (Some(result), Some(sender)) = (task_result, sender) {
207                sender.send(result).expect("send ok");
208            }
209        }
210    }
211
212    impl MediaTask for TestMediaTask {
213        fn finished(&mut self) -> BoxFuture<'static, Result<(), MediaTaskError>> {
214            self.result.clone().boxed()
215        }
216
217        fn stop(&mut self) -> Result<(), MediaTaskError> {
218            let _ = self.stream.lock().take();
219            {
220                let mut lock = self.sender.lock();
221                if let Some(sender) = lock.take() {
222                    let _ = sender.send(Ok(()));
223                    return Ok(());
224                }
225            }
226            // Result should be available.
227            self.finished().now_or_never().unwrap()
228        }
229    }
230
231    pub struct TestMediaTaskRunner {
232        /// The peer_id this was started with.
233        pub peer_id: PeerId,
234        /// The config that this runner will start tasks for
235        pub codec_config: MediaCodecConfig,
236        /// If this is reconfigurable
237        pub reconfigurable: bool,
238        /// If this supports delay reporting
239        pub supports_set_delay: bool,
240        /// What the delay is right now
241        pub set_delay: Option<std::time::Duration>,
242        /// The Sender that will send a clone of the started tasks to the builder.
243        pub sender: mpsc::Sender<TestMediaTask>,
244    }
245
246    impl MediaTaskRunner for TestMediaTaskRunner {
247        fn start(
248            &mut self,
249            stream: MediaStream,
250            _offload: Option<AudioOffloadExtProxy>,
251        ) -> Result<Box<dyn MediaTask>, MediaTaskError> {
252            let task = TestMediaTask::new(
253                self.peer_id.clone(),
254                self.codec_config.clone(),
255                stream,
256                self.set_delay.unwrap_or(Duration::ZERO),
257            );
258            // Don't particularly care if the receiver got dropped.
259            let _ = self.sender.try_send(task.clone());
260            Ok(Box::new(task))
261        }
262
263        fn set_delay(&mut self, delay: std::time::Duration) -> Result<(), MediaTaskError> {
264            if self.supports_set_delay {
265                self.set_delay = Some(delay);
266                Ok(())
267            } else {
268                Err(MediaTaskError::NotSupported)
269            }
270        }
271
272        fn reconfigure(&mut self, config: &MediaCodecConfig) -> Result<(), MediaTaskError> {
273            if self.reconfigurable {
274                self.codec_config = config.clone();
275                Ok(())
276            } else {
277                Err(MediaTaskError::NotSupported)
278            }
279        }
280    }
281
282    /// A TestMediaTask expects to be configured once, and then started and stopped as appropriate.
283    /// It will Error if started again while started or stopped while stopped, or if it was
284    /// configured multiple times.
285    pub struct TestMediaTaskBuilder {
286        sender: Mutex<mpsc::Sender<TestMediaTask>>,
287        receiver: mpsc::Receiver<TestMediaTask>,
288        reconfigurable: bool,
289        supports_set_delay: bool,
290        configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
291        direction: EndpointType,
292    }
293
294    impl TestMediaTaskBuilder {
295        pub fn new() -> Self {
296            let (sender, receiver) = mpsc::channel(5);
297            Self {
298                sender: Mutex::new(sender),
299                receiver,
300                reconfigurable: false,
301                supports_set_delay: false,
302                configs: Ok(vec![crate::codec::MediaCodecConfig::min_sbc()]),
303                direction: EndpointType::Sink,
304            }
305        }
306
307        pub fn with_configs(
308            &mut self,
309            configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
310        ) -> &mut Self {
311            self.configs = configs;
312            self
313        }
314
315        pub fn with_direction(&mut self, direction: EndpointType) -> &mut Self {
316            self.direction = direction;
317            self
318        }
319
320        pub fn new_reconfigurable() -> Self {
321            Self { reconfigurable: true, ..Self::new() }
322        }
323
324        pub fn new_delayable() -> Self {
325            Self { supports_set_delay: true, ..Self::new() }
326        }
327
328        /// Returns a type that implements MediaTaskBuilder.  When a MediaTask is built using
329        /// configure(), it will be available from `next_task`.
330        pub fn builder(&self) -> Box<dyn MediaTaskBuilder> {
331            Box::new(TestMediaTaskBuilderBuilder {
332                sender: self.sender.lock().clone(),
333                reconfigurable: self.reconfigurable,
334                supports_set_delay: self.supports_set_delay,
335                configs: self.configs.clone(),
336                direction: self.direction,
337            })
338        }
339
340        /// Gets a future that will return a handle to the next TestMediaTask that gets started
341        /// from a Runner that was retrieved from this builder.
342        /// The TestMediaTask, can tell you when it's started and give you a handle to the MediaStream.
343        pub fn next_task(&mut self) -> impl Future<Output = Option<TestMediaTask>> + '_ {
344            self.receiver.next()
345        }
346
347        /// Expects that a task had been built, and retrieves that task, or panics.
348        #[track_caller]
349        pub fn expect_task(&mut self) -> TestMediaTask {
350            self.receiver
351                .try_next()
352                .expect("should have made a task")
353                .expect("shouldn't have dropped all senders")
354        }
355    }
356
357    #[derive(Clone)]
358    struct TestMediaTaskBuilderBuilder {
359        sender: mpsc::Sender<TestMediaTask>,
360        reconfigurable: bool,
361        supports_set_delay: bool,
362        configs: Result<Vec<MediaCodecConfig>, MediaTaskError>,
363        direction: EndpointType,
364    }
365
366    impl MediaTaskBuilder for TestMediaTaskBuilderBuilder {
367        fn configure(
368            &self,
369            peer_id: &PeerId,
370            codec_config: &MediaCodecConfig,
371        ) -> Result<Box<dyn MediaTaskRunner>, MediaTaskError> {
372            let runner = TestMediaTaskRunner {
373                peer_id: peer_id.clone(),
374                codec_config: codec_config.clone(),
375                sender: self.sender.clone(),
376                reconfigurable: self.reconfigurable,
377                supports_set_delay: self.supports_set_delay,
378                set_delay: None,
379            };
380            Ok::<Box<dyn MediaTaskRunner>, _>(Box::new(runner))
381        }
382
383        fn direction(&self) -> EndpointType {
384            self.direction
385        }
386
387        fn supported_configs(
388            &self,
389            _peer_id: &PeerId,
390            _offload: Option<AudioOffloadExtProxy>,
391        ) -> BoxFuture<'static, Result<Vec<MediaCodecConfig>, MediaTaskError>> {
392            futures::future::ready(self.configs.clone()).boxed()
393        }
394    }
395}