1use anyhow::{Context as _, Error, format_err};
6use fidl::endpoints::ClientEnd;
7use fidl_fuchsia_media::*;
8use fidl_fuchsia_mediacodec::*;
9use fidl_fuchsia_sysmem2::*;
10use fuchsia_stream_processors::*;
11use fuchsia_sync::{Mutex, RwLock};
12use futures::future::{MaybeDone, maybe_done};
13use futures::io::{self, AsyncWrite};
14use futures::stream::{FusedStream, Stream};
15use futures::task::{Context, Poll, Waker};
16use futures::{Future, StreamExt, ready};
17use log::{trace, warn};
18use std::collections::{HashSet, VecDeque};
19use std::mem;
20use std::pin::Pin;
21use std::sync::Arc;
22
23use crate::buffer_collection_constraints::buffer_collection_constraints_default;
24use crate::sysmem_allocator::{BufferName, SysmemAllocatedBuffers, SysmemAllocation};
25
26fn fidl_error_to_io_error(e: fidl::Error) -> io::Error {
27 io::Error::other(format_err!("Fidl Error: {}", e))
28}
29
30#[derive(Debug)]
31enum Listener {
34 None,
36 New,
38 Some(Waker),
40}
41
42impl Listener {
43 fn register(&mut self, waker: Waker) {
46 *self = match mem::replace(self, Listener::None) {
47 Listener::None => panic!("Polled a listener with no pollers"),
48 _ => Listener::Some(waker),
49 };
50 }
51
52 fn wake(&mut self) {
55 if let Listener::None = self {
56 return;
57 }
58 match mem::replace(self, Listener::New) {
59 Listener::None => panic!("Should have been polled"),
60 Listener::Some(waker) => waker.wake(),
61 Listener::New => {}
62 }
63 }
64
65 fn waker(&self) -> Option<&Waker> {
67 if let Listener::Some(waker) = self { Some(waker) } else { None }
68 }
69}
70
71impl Default for Listener {
72 fn default() -> Self {
73 Listener::None
74 }
75}
76
77struct OutputQueue {
79 listener: Listener,
81 queue: VecDeque<Packet>,
83 ended: bool,
86}
87
88impl OutputQueue {
89 fn enqueue(&mut self, packet: Packet) {
91 self.queue.push_back(packet);
92 self.listener.wake();
93 }
94
95 fn mark_ended(&mut self) {
98 self.ended = true;
99 self.listener.wake();
100 }
101
102 fn waker(&self) -> Option<&Waker> {
103 self.listener.waker()
104 }
105
106 fn wake(&mut self) {
108 self.listener.wake();
109 }
110}
111
112impl Default for OutputQueue {
113 fn default() -> Self {
114 OutputQueue { listener: Listener::default(), queue: VecDeque::new(), ended: false }
115 }
116}
117
118impl Stream for OutputQueue {
119 type Item = Packet;
120
121 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
122 match self.queue.pop_front() {
123 Some(packet) => Poll::Ready(Some(packet)),
124 None if self.ended => Poll::Ready(None),
125 None => {
126 self.listener.register(cx.waker().clone());
127 Poll::Pending
128 }
129 }
130 }
131}
132
133const MIN_INPUT_BUFFER_SIZE: u32 = 4096;
136const MIN_OUTPUT_BUFFER_SIZE: u32 = 0;
138
139#[derive(PartialEq, Eq, Hash, Clone, Debug)]
141struct InputBufferIndex(u32);
142
143struct StreamProcessorInner {
146 processor: StreamProcessorProxy,
148 sysmem_client: AllocatorProxy,
150 events: StreamProcessorEventStream,
152 input_packet_size: u64,
154 client_owned: HashSet<InputBufferIndex>,
157 input_cursor: Option<(InputBufferIndex, u64)>,
159 output_queue: Mutex<OutputQueue>,
163 input_waker: Option<Waker>,
165 input_allocation: MaybeDone<SysmemAllocation>,
167 output_allocation: MaybeDone<SysmemAllocation>,
169}
170
171impl StreamProcessorInner {
172 fn handle_event(&mut self, evt: StreamProcessorEvent) -> Result<(), Error> {
176 match evt {
177 StreamProcessorEvent::OnInputConstraints { input_constraints } => {
178 let _input_constraints = ValidStreamBufferConstraints::try_from(input_constraints)?;
179 let buffer_constraints =
180 Self::buffer_constraints_from_min_size(MIN_INPUT_BUFFER_SIZE);
181 let processor = self.processor.clone();
182 let mut partial_settings = Self::partial_settings();
183 let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| {
184 partial_settings.sysmem_token =
187 Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
188 token.into_channel(),
189 ));
190 if let Err(e) = processor.set_input_buffer_partial_settings(partial_settings) {
192 warn!("Couldn't set input buffer settings: {:?}", e);
193 }
194 };
195 self.input_allocation = maybe_done(SysmemAllocation::allocate(
196 self.sysmem_client.clone(),
197 BufferName { name: "StreamProcessorInput", priority: 1 },
198 None,
199 buffer_constraints,
200 token_fn,
201 )?);
202 }
203 StreamProcessorEvent::OnOutputConstraints { output_config } => {
204 let output_constraints = ValidStreamOutputConstraints::try_from(output_config)?;
205 if !output_constraints.buffer_constraints_action_required {
206 return Ok(());
207 }
208 let buffer_constraints =
209 Self::buffer_constraints_from_min_size(MIN_OUTPUT_BUFFER_SIZE);
210 let processor = self.processor.clone();
211 let mut partial_settings = Self::partial_settings();
212 let token_fn = move |token: ClientEnd<BufferCollectionTokenMarker>| {
213 partial_settings.sysmem_token =
216 Some(ClientEnd::<fidl_fuchsia_sysmem::BufferCollectionTokenMarker>::new(
217 token.into_channel(),
218 ));
219 if let Err(e) = processor.set_output_buffer_partial_settings(partial_settings) {
221 warn!("Couldn't set output buffer settings: {:?}", e);
222 }
223 };
224
225 self.output_allocation = maybe_done(SysmemAllocation::allocate(
226 self.sysmem_client.clone(),
227 BufferName { name: "StreamProcessorOutput", priority: 1 },
228 None,
229 buffer_constraints,
230 token_fn,
231 )?);
232 }
233 StreamProcessorEvent::OnOutputPacket { output_packet, .. } => {
234 let mut lock = self.output_queue.lock();
235 lock.enqueue(output_packet);
236 }
237 StreamProcessorEvent::OnFreeInputPacket {
238 free_input_packet: PacketHeader { packet_index: Some(idx), .. },
239 } => {
240 if !self.client_owned.insert(InputBufferIndex(idx)) {
241 warn!("Freed an input packet that was already freed: {:?}", idx);
242 }
243 self.setup_input_cursor();
244 }
245 StreamProcessorEvent::OnOutputEndOfStream { .. } => {
246 let mut lock = self.output_queue.lock();
247 lock.mark_ended();
248 }
249 StreamProcessorEvent::OnOutputFormat { .. } => {}
250 e => trace!("Unhandled stream processor event: {:?}", e),
251 }
252 Ok(())
253 }
254
255 fn process_event(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
259 match ready!(self.events.poll_next_unpin(cx)) {
260 Some(Err(e)) => Poll::Ready(Err(e.into())),
261 Some(Ok(event)) => Poll::Ready(self.handle_event(event)),
262 None => Poll::Ready(Err(format_err!("Client disconnected"))),
263 }
264 }
265
266 fn buffer_constraints_from_min_size(min_buffer_size: u32) -> BufferCollectionConstraints {
267 BufferCollectionConstraints {
268 buffer_memory_constraints: Some(BufferMemoryConstraints {
269 min_size_bytes: Some(min_buffer_size as u64),
270 ..Default::default()
271 }),
272 ..buffer_collection_constraints_default()
273 }
274 }
275
276 fn partial_settings() -> StreamBufferPartialSettings {
277 StreamBufferPartialSettings {
278 buffer_lifetime_ordinal: Some(1),
279 buffer_constraints_version_ordinal: Some(1),
280 sysmem_token: None,
281 ..Default::default()
282 }
283 }
284
285 fn input_buffers(&mut self) -> &mut SysmemAllocatedBuffers {
286 Pin::new(&mut self.input_allocation)
287 .output_mut()
288 .expect("allocation completed")
289 .as_mut()
290 .expect("succcessful allocation")
291 }
292
293 fn output_buffers(&mut self) -> &mut SysmemAllocatedBuffers {
294 Pin::new(&mut self.output_allocation)
295 .output_mut()
296 .expect("allocation completed")
297 .as_mut()
298 .expect("succcessful allocation")
299 }
300
301 fn input_allocation_complete(&mut self) -> Result<(), Error> {
304 let _ = Pin::new(&mut self.input_allocation)
305 .output_mut()
306 .ok_or_else(|| format_err!("allocation isn't complete"))?;
307
308 let settings = self.input_buffers().settings();
309 self.input_packet_size = (*settings.size_bytes.as_ref().unwrap()).try_into()?;
310 let buffer_count = self.input_buffers().len();
311 for i in 0..buffer_count {
312 let _ = self.client_owned.insert(InputBufferIndex(i.try_into()?));
313 }
314 self.setup_input_cursor();
316 Ok(())
317 }
318
319 fn output_allocation_complete(&mut self) -> Result<(), Error> {
323 let _ = Pin::new(&mut self.output_allocation)
324 .output_mut()
325 .ok_or_else(|| format_err!("allocation isn't complete"))?;
326 self.processor
327 .complete_output_buffer_partial_settings(1)
328 .context("setting output buffer settings")?;
329 Ok(())
330 }
331
332 fn poll_buffer_allocation(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
335 if let MaybeDone::Future(_) = self.input_allocation {
336 match Pin::new(&mut self.input_allocation).poll(cx) {
337 Poll::Ready(()) => {
338 if let Err(e) = self.input_allocation_complete() {
339 return Poll::Ready(Err(e));
340 }
341 }
342 Poll::Pending => {}
343 };
344 }
345 if let MaybeDone::Future(_) = self.output_allocation {
346 match Pin::new(&mut self.output_allocation).poll(cx) {
347 Poll::Ready(()) => {
348 if let Err(e) = self.output_allocation_complete() {
349 return Poll::Ready(Err(e));
350 }
351 }
352 Poll::Pending => {}
353 };
354 }
355 Poll::Pending
356 }
357
358 fn waiting_waker(&self) -> Option<Waker> {
360 match (self.output_queue.lock().waker(), &self.input_waker) {
361 (None, None) => None,
363 (Some(waker), _) => Some(waker.clone()),
364 (_, Some(waker)) => Some(waker.clone()),
365 }
366 }
367
368 fn poll_events(&mut self) -> Result<(), Error> {
372 let waker = loop {
373 let waker = match self.waiting_waker() {
374 None => return Ok(()),
377 Some(waker) => waker,
378 };
379 match self.process_event(&mut Context::from_waker(&waker)) {
380 Poll::Pending => break waker,
381 Poll::Ready(Err(e)) => {
382 warn!("Stream processing error: {:?}", e);
383 return Err(e.into());
384 }
385 Poll::Ready(Ok(())) => {}
387 }
388 };
389
390 if let Poll::Ready(Err(e)) = self.poll_buffer_allocation(&mut Context::from_waker(&waker)) {
391 warn!("Stream buffer allocation error: {:?}", e);
392 return Err(e.into());
393 }
394 Ok(())
395 }
396
397 fn wake_output(&mut self) {
398 self.output_queue.lock().wake();
399 }
400
401 fn wake_input(&mut self) {
402 if let Some(w) = self.input_waker.take() {
403 w.wake();
404 }
405 }
406
407 fn setup_input_cursor(&mut self) {
410 if self.input_cursor.is_some() {
411 return;
413 }
414 let next_idx = match self.client_owned.iter().next() {
415 None => return,
416 Some(idx) => idx.clone(),
417 };
418 let _ = self.client_owned.remove(&next_idx);
419 self.input_cursor = Some((next_idx, 0));
420 self.wake_input();
421 }
422
423 fn read_output_packet(&mut self, packet: Packet) -> Result<Vec<u8>, Error> {
426 let packet = ValidPacket::try_from(packet)?;
427
428 let output_size = packet.valid_length_bytes as usize;
429 let offset = packet.start_offset as u64;
430 let mut output = vec![0; output_size];
431 let buf_idx = packet.buffer_index;
432 let vmo = self.output_buffers().get_mut(buf_idx).expect("output vmo should exist");
433 vmo.read(&mut output, offset)?;
434 self.processor.recycle_output_packet(&packet.header.into())?;
435 Ok(output)
436 }
437}
438
439pub struct StreamProcessor {
444 inner: Arc<RwLock<StreamProcessorInner>>,
445}
446
447pub struct StreamProcessorOutputStream {
450 inner: Arc<RwLock<StreamProcessorInner>>,
451}
452
453impl StreamProcessor {
454 fn create(processor: StreamProcessorProxy, sysmem_client: AllocatorProxy) -> Self {
457 let events = processor.take_event_stream();
458 Self {
459 inner: Arc::new(RwLock::new(StreamProcessorInner {
460 processor,
461 sysmem_client,
462 events,
463 input_packet_size: 0,
464 client_owned: HashSet::new(),
465 input_cursor: None,
466 output_queue: Default::default(),
467 input_waker: None,
468 input_allocation: maybe_done(SysmemAllocation::pending()),
469 output_allocation: maybe_done(SysmemAllocation::pending()),
470 })),
471 }
472 }
473
474 pub fn create_encoder(
478 input_domain: DomainFormat,
479 encoder_settings: EncoderSettings,
480 ) -> Result<StreamProcessor, Error> {
481 let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
482 .context("Connecting to sysmem")?;
483
484 let format_details = FormatDetails {
485 domain: Some(input_domain),
486 encoder_settings: Some(encoder_settings),
487 format_details_version_ordinal: Some(1),
488 mime_type: Some("audio/pcm".to_string()),
489 oob_bytes: None,
490 pass_through_parameters: None,
491 timebase: None,
492 ..Default::default()
493 };
494
495 let encoder_params = CreateEncoderParams {
496 input_details: Some(format_details),
497 require_hw: Some(false),
498 ..Default::default()
499 };
500
501 let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>()
502 .context("Failed to connect to Codec Factory")?;
503
504 let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy();
505
506 codec_svc.create_encoder(&encoder_params, stream_processor_serverend)?;
507
508 Ok(StreamProcessor::create(processor, sysmem_client))
509 }
510
511 pub fn create_decoder(
515 mime_type: &str,
516 oob_bytes: Option<Vec<u8>>,
517 ) -> Result<StreamProcessor, Error> {
518 let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
519 .context("Connecting to sysmem")?;
520
521 let format_details = FormatDetails {
522 mime_type: Some(mime_type.to_string()),
523 oob_bytes: oob_bytes,
524 format_details_version_ordinal: Some(1),
525 encoder_settings: None,
526 domain: None,
527 pass_through_parameters: None,
528 timebase: None,
529 ..Default::default()
530 };
531
532 let decoder_params = CreateDecoderParams {
533 input_details: Some(format_details),
534 permit_lack_of_split_header_handling: Some(true),
535 ..Default::default()
536 };
537
538 let codec_svc = fuchsia_component::client::connect_to_protocol::<CodecFactoryMarker>()
539 .context("Failed to connect to Codec Factory")?;
540
541 let (processor, stream_processor_serverend) = fidl::endpoints::create_proxy();
542
543 codec_svc.create_decoder(&decoder_params, stream_processor_serverend)?;
544
545 Ok(StreamProcessor::create(processor, sysmem_client))
546 }
547
548 pub fn take_output_stream(&mut self) -> Result<StreamProcessorOutputStream, Error> {
552 {
553 let read = self.inner.read();
554 let mut lock = read.output_queue.lock();
555 if let Listener::None = lock.listener {
556 lock.listener = Listener::New;
557 } else {
558 return Err(format_err!("Output stream already taken"));
559 }
560 }
561 Ok(StreamProcessorOutputStream { inner: self.inner.clone() })
562 }
563
564 fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize, io::Error> {
566 let mut bytes_idx = 0;
567 while bytes.len() > bytes_idx {
568 {
569 let mut write = self.inner.write();
570 let (idx, size) = match write.input_cursor.take() {
571 None => return Ok(bytes_idx),
572 Some(x) => x,
573 };
574 let space_left = write.input_packet_size - size;
575 let left_to_write = bytes.len() - bytes_idx;
576 let buffer_vmo = write.input_buffers().get_mut(idx.0).expect("need buffer vmo");
577 if space_left as usize > left_to_write {
578 let write_buf = &bytes[bytes_idx..];
579 let write_len = write_buf.len();
580 buffer_vmo.write(write_buf, size)?;
581 bytes_idx += write_len;
582 write.input_cursor = Some((idx, size + write_len as u64));
583 assert!(bytes.len() == bytes_idx);
584 return Ok(bytes_idx);
585 }
586 let end_idx = bytes_idx + space_left as usize;
587 let write_buf = &bytes[bytes_idx..end_idx];
588 let write_len = write_buf.len();
589 buffer_vmo.write(write_buf, size)?;
590 bytes_idx += write_len;
591 assert_eq!(size + write_len as u64, write.input_packet_size);
593 write.input_cursor = Some((idx, write.input_packet_size));
594 }
595 self.send_packet()?;
596 }
597 Ok(bytes_idx)
598 }
599
600 pub fn send_packet(&mut self) -> Result<(), io::Error> {
604 let mut write = self.inner.write();
605 if write.input_cursor.is_none() {
606 return Ok(());
608 }
609 let (idx, size) = write.input_cursor.take().expect("input cursor is none");
610 if size == 0 {
611 write.input_cursor = Some((idx, size));
613 return Ok(());
614 }
615 let packet = Packet {
616 header: Some(PacketHeader {
617 buffer_lifetime_ordinal: Some(1),
618 packet_index: Some(idx.0),
619 ..Default::default()
620 }),
621 buffer_index: Some(idx.0),
622 stream_lifetime_ordinal: Some(1),
623 start_offset: Some(0),
624 valid_length_bytes: Some(size as u32),
625 start_access_unit: Some(true),
626 known_end_access_unit: Some(true),
627 ..Default::default()
628 };
629 write.processor.queue_input_packet(&packet).map_err(fidl_error_to_io_error)?;
630 write.setup_input_cursor();
632 Ok(())
633 }
634
635 fn poll_writable(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
639 let mut write = self.inner.write();
640 write.input_waker = None;
643 if write.input_cursor.is_some() {
644 return Poll::Ready(Ok(()));
645 }
646 write.input_waker = Some(cx.waker().clone());
647 if let Err(e) = write.poll_events() {
652 return Poll::Ready(Err(io::Error::other(e)));
653 }
654 Poll::Pending
655 }
656
657 pub fn close(&mut self) -> Result<(), io::Error> {
658 self.send_packet()?;
659
660 let mut write = self.inner.write();
661
662 write.processor.queue_input_end_of_stream(1).map_err(fidl_error_to_io_error)?;
663 write.input_cursor = None;
666 write.wake_input();
667 write.wake_output();
668 Ok(())
669 }
670}
671
672impl AsyncWrite for StreamProcessor {
673 fn poll_write(
674 mut self: Pin<&mut Self>,
675 cx: &mut Context<'_>,
676 buf: &[u8],
677 ) -> Poll<io::Result<usize>> {
678 ready!(self.poll_writable(cx))?;
679 match self.write_bytes(buf) {
680 Ok(written) => Poll::Ready(Ok(written)),
681 Err(e) => Poll::Ready(Err(e.into())),
682 }
683 }
684
685 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
686 Poll::Ready(self.send_packet())
687 }
688
689 fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
690 Poll::Ready(self.send_packet())
691 }
692}
693
694impl Stream for StreamProcessorOutputStream {
695 type Item = Result<Vec<u8>, Error>;
696
697 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
698 let mut write = self.inner.write();
699 let packet = {
701 let mut queue = write.output_queue.lock();
702 match queue.poll_next_unpin(cx) {
703 Poll::Ready(Some(packet)) => Some(Some(packet)),
704 Poll::Ready(None) => Some(None),
705 Poll::Pending => {
706 None
709 }
710 }
711 };
712 if let Err(e) = write.poll_events() {
715 return Poll::Ready(Some(Err(e.into())));
716 }
717 match packet {
718 Some(Some(packet)) => Poll::Ready(Some(write.read_output_packet(packet))),
719 Some(None) => Poll::Ready(None),
720 None => Poll::Pending,
721 }
722 }
723}
724
725impl FusedStream for StreamProcessorOutputStream {
726 fn is_terminated(&self) -> bool {
727 self.inner.read().output_queue.lock().ended
728 }
729}
730
731#[cfg(test)]
732mod tests {
733 use super::*;
734
735 use async_test_helpers::run_while;
736 use byteorder::{ByteOrder, NativeEndian};
737 use fixture::fixture;
738 use fuchsia_async as fasync;
739 use futures::FutureExt;
740 use futures::io::AsyncWriteExt;
741 use futures_test::task::new_count_waker;
742 use sha2::{Digest as _, Sha256};
743 use std::fs::File;
744 use std::io::{Read, Write};
745 use std::pin::pin;
746
747 use stream_processor_test::ExpectedDigest;
748
749 const PCM_SAMPLE_SIZE: usize = 2;
750
751 #[derive(Clone, Debug)]
752 pub struct PcmAudio {
753 pcm_format: PcmFormat,
754 buffer: Vec<u8>,
755 }
756
757 impl PcmAudio {
758 pub fn create_saw_wave(pcm_format: PcmFormat, frame_count: usize) -> Self {
759 const FREQUENCY: f32 = 20.0;
760 const AMPLITUDE: f32 = 0.2;
761
762 let pcm_frame_size = PCM_SAMPLE_SIZE * pcm_format.channel_map.len();
763 let samples_per_frame = pcm_format.channel_map.len();
764 let sample_count = frame_count * samples_per_frame;
765
766 let mut buffer = vec![0; frame_count * pcm_frame_size];
767
768 for i in 0..sample_count {
769 let frame = (i / samples_per_frame) as f32;
770 let value =
771 ((frame * FREQUENCY / (pcm_format.frames_per_second as f32)) % 1.0) * AMPLITUDE;
772 let sample = (value * i16::max_value() as f32) as i16;
773
774 let mut sample_bytes = [0; std::mem::size_of::<i16>()];
775 NativeEndian::write_i16(&mut sample_bytes, sample);
776
777 let offset = i * PCM_SAMPLE_SIZE;
778 buffer[offset] = sample_bytes[0];
779 buffer[offset + 1] = sample_bytes[1];
780 }
781
782 Self { pcm_format, buffer }
783 }
784
785 pub fn frame_size(&self) -> usize {
786 self.pcm_format.channel_map.len() * PCM_SAMPLE_SIZE
787 }
788 }
789
790 pub struct BytesValidator {
793 pub output_file: Option<&'static str>,
794 pub expected_digest: ExpectedDigest,
795 }
796
797 impl BytesValidator {
798 fn write_and_hash(&self, mut file: impl Write, bytes: &[u8]) -> Result<(), Error> {
799 let mut hasher = Sha256::default();
800
801 file.write_all(&bytes)?;
802 hasher.update(&bytes);
803
804 let digest: [u8; 32] = hasher.finalize().into();
805 if self.expected_digest.bytes != digest {
806 return Err(format_err!(
807 "Expected {}; got {}",
808 self.expected_digest,
809 hex::encode(digest)
810 ))
811 .into();
812 }
813
814 Ok(())
815 }
816
817 fn output_file(&self) -> Result<impl Write, Error> {
818 Ok(if let Some(file) = self.output_file {
819 Box::new(std::fs::File::create(file)?) as Box<dyn Write>
820 } else {
821 Box::new(std::io::sink()) as Box<dyn Write>
822 })
823 }
824
825 fn validate(&self, bytes: &[u8]) -> Result<(), Error> {
826 self.write_and_hash(self.output_file()?, &bytes)
827 }
828 }
829
830 #[fuchsia::test]
831 fn encode_sbc() {
832 let mut exec = fasync::TestExecutor::new();
833
834 let pcm_format = PcmFormat {
835 pcm_mode: AudioPcmMode::Linear,
836 bits_per_sample: 16,
837 frames_per_second: 44100,
838 channel_map: vec![AudioChannelId::Cf],
839 };
840
841 let sub_bands = SbcSubBands::SubBands4;
842 let block_count = SbcBlockCount::BlockCount8;
843
844 let input_frames = 3000;
845
846 let pcm_audio = PcmAudio::create_saw_wave(pcm_format.clone(), input_frames);
847
848 let sbc_encoder_settings = EncoderSettings::Sbc(SbcEncoderSettings {
849 sub_bands,
850 block_count,
851 allocation: SbcAllocation::AllocLoudness,
852 channel_mode: SbcChannelMode::Mono,
853 bit_pool: 59, });
855
856 let input_domain = DomainFormat::Audio(AudioFormat::Uncompressed(
857 AudioUncompressedFormat::Pcm(pcm_format),
858 ));
859
860 let mut encoder = StreamProcessor::create_encoder(input_domain, sbc_encoder_settings)
861 .expect("to create Encoder");
862
863 let frames_per_packet: usize = 8; let packet_size = pcm_audio.frame_size() * frames_per_packet;
865 let mut packets = pcm_audio.buffer.as_slice().chunks(packet_size);
866 let first_packet = packets.next().unwrap();
867
868 let written =
871 exec.run_singlethreaded(&mut encoder.write(first_packet)).expect("successful write");
872 assert_eq!(written, first_packet.len());
873
874 let mut encoded_stream = encoder.take_output_stream().expect("Stream should be taken");
875
876 assert!(encoder.take_output_stream().is_err());
878
879 let encoded_fut = pin!(encoded_stream.next());
882
883 let (waker, encoder_fut_wake_count) = new_count_waker();
884 let mut counting_ctx = Context::from_waker(&waker);
885
886 assert!(encoded_fut.poll(&mut counting_ctx).is_pending());
887
888 let mut frames_sent = first_packet.len() / pcm_audio.frame_size();
889
890 for packet in packets {
891 let mut written_fut = encoder.write(&packet);
892
893 let written_bytes =
894 exec.run_singlethreaded(&mut written_fut).expect("to write to encoder");
895
896 assert_eq!(packet.len(), written_bytes);
897 frames_sent += packet.len() / pcm_audio.frame_size();
898 }
899
900 encoder.close().expect("stream should always be closable");
901
902 assert_eq!(input_frames, frames_sent);
903
904 let woke_count = encoder_fut_wake_count.get();
907 while encoder_fut_wake_count.get() == woke_count {
908 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
909 }
910 assert_eq!(encoder_fut_wake_count.get(), woke_count + 1);
911
912 let mut encoded = Vec::new();
914
915 loop {
916 let mut encoded_fut = encoded_stream.next();
917
918 match exec.run_singlethreaded(&mut encoded_fut) {
919 Some(Ok(enc_data)) => {
920 assert!(!enc_data.is_empty());
921 encoded.extend_from_slice(&enc_data);
922 }
923 Some(Err(e)) => {
924 panic!("Unexpected error when polling encoded data: {}", e);
925 }
926 None => {
927 break;
928 }
929 }
930 }
931
932 let expected_digest = ExpectedDigest::new(
934 "Sbc: 44.1kHz/Loudness/Mono/bitpool 56/blocks 8/subbands 4",
935 "5c65a88bda3f132538966d87df34aa8675f85c9892b7f9f5571f76f3c7813562",
936 );
937 let hash_validator = BytesValidator { output_file: None, expected_digest };
938
939 assert_eq!(6110, encoded.len(), "Encoded size should be equal");
940
941 let validated = hash_validator.validate(encoded.as_slice());
942 assert!(validated.is_ok(), "Failed hash: {:?}", validated);
943 }
944
945 fn fix_sbc_test_file<F>(_name: &str, test: F)
946 where
947 F: FnOnce(Vec<u8>) -> (),
948 {
949 const SBC_TEST_FILE: &str = "/pkg/data/s16le44100mono.sbc";
950
951 let mut sbc_data = Vec::new();
952 let _ = File::open(SBC_TEST_FILE)
953 .expect("open test file")
954 .read_to_end(&mut sbc_data)
955 .expect("read test file");
956
957 test(sbc_data)
958 }
959
960 #[fixture(fix_sbc_test_file)]
961 #[fuchsia::test]
962 fn decode_sbc(sbc_data: Vec<u8>) {
963 let mut exec = fasync::TestExecutor::new();
964
965 const SBC_FRAME_SIZE: usize = 72;
966 const INPUT_FRAMES: usize = 23;
967
968 let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
970 let mut decoder =
971 StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
972
973 let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
974
975 assert!(decoder.take_output_stream().is_err());
977
978 let mut frames_sent = 0;
979
980 let frames_per_packet: usize = 1; let packet_size = SBC_FRAME_SIZE * frames_per_packet;
982
983 for frames in sbc_data.as_slice().chunks(packet_size) {
984 let mut written_fut = decoder.write(&frames);
985
986 let written_bytes =
987 exec.run_singlethreaded(&mut written_fut).expect("to write to decoder");
988
989 assert_eq!(frames.len(), written_bytes);
990 frames_sent += frames.len() / SBC_FRAME_SIZE;
991 }
992
993 assert_eq!(INPUT_FRAMES, frames_sent);
994
995 let mut flush_fut = pin!(decoder.flush());
996 exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
997
998 decoder.close().expect("stream should always be closable");
999
1000 let mut decoded = Vec::new();
1002
1003 loop {
1004 let mut decoded_fut = decoded_stream.next();
1005
1006 match exec.run_singlethreaded(&mut decoded_fut) {
1007 Some(Ok(dec_data)) => {
1008 assert!(!dec_data.is_empty());
1009 decoded.extend_from_slice(&dec_data);
1010 }
1011 Some(Err(e)) => {
1012 panic!("Unexpected error when polling decoded data: {}", e);
1013 }
1014 None => {
1015 break;
1016 }
1017 }
1018 }
1019
1020 let expected_digest = ExpectedDigest::new(
1022 "Pcm: 44.1kHz/16bit/Mono",
1023 "ff2e7afea51217886d3df15b9a623b4e49c9bd9bd79c58ac01bc94c5511e08d6",
1024 );
1025 let hash_validator = BytesValidator { output_file: None, expected_digest };
1026
1027 assert_eq!(256 * INPUT_FRAMES, decoded.len(), "Decoded size should be equal");
1028
1029 let validated = hash_validator.validate(decoded.as_slice());
1030 assert!(validated.is_ok(), "Failed hash: {:?}", validated);
1031 }
1032
1033 #[fixture(fix_sbc_test_file)]
1034 #[fuchsia::test]
1035 fn decode_sbc_wakes_output_to_process_events(sbc_data: Vec<u8>) {
1036 let mut exec = fasync::TestExecutor::new();
1037 const SBC_FRAME_SIZE: usize = 72;
1038
1039 let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
1041 let mut decoder =
1042 StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
1043
1044 let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
1045 let next_frame = chunks.next().unwrap();
1046
1047 let written =
1050 exec.run_singlethreaded(&mut decoder.write(next_frame)).expect("successful write");
1051 assert_eq!(written, next_frame.len());
1052
1053 let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
1054
1055 let decoded_fut = pin!(decoded_stream.next());
1058
1059 let (waker, decoder_fut_wake_count) = new_count_waker();
1060 let mut counting_ctx = Context::from_waker(&waker);
1061
1062 assert!(decoded_fut.poll(&mut counting_ctx).is_pending());
1063
1064 let frame = chunks.next().unwrap();
1067 let mut written_fut = decoder.write(&frame);
1068 let written_bytes = exec.run_singlethreaded(&mut written_fut).expect("to write to decoder");
1069 assert_eq!(frame.len(), written_bytes);
1070
1071 let mut flush_fut = pin!(decoder.flush());
1072 exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1073
1074 assert_eq!(decoder_fut_wake_count.get(), 0);
1077 while decoder_fut_wake_count.get() == 0 {
1078 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1079 }
1080 assert_eq!(decoder_fut_wake_count.get(), 1);
1081
1082 let mut decoded = Vec::new();
1083 let mut decoded_fut = decoded_stream.next();
1085
1086 match exec.run_singlethreaded(&mut decoded_fut) {
1087 Some(Ok(dec_data)) => {
1088 assert!(!dec_data.is_empty());
1089 decoded.extend_from_slice(&dec_data);
1090 }
1091 x => panic!("Expected decoded frame, got {:?}", x),
1092 }
1093
1094 assert_eq!(512, decoded.len(), "Decoded size should be equal to one frame");
1095 }
1096
1097 #[fixture(fix_sbc_test_file)]
1098 #[fuchsia::test]
1099 fn decode_sbc_wakes_input_to_process_events(sbc_data: Vec<u8>) {
1100 let mut exec = fasync::TestExecutor::new();
1101 const SBC_FRAME_SIZE: usize = 72;
1102
1103 let oob_data = Some([0x82, 0x00, 0x00, 0x00].to_vec());
1105 let mut decoder =
1106 StreamProcessor::create_decoder("audio/sbc", oob_data).expect("to create decoder");
1107
1108 let mut decoded_stream = decoder.take_output_stream().expect("Stream should be taken");
1109
1110 let decoded_fut = pin!(decoded_stream.next());
1111
1112 let mut chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
1113 let next_frame = chunks.next().unwrap();
1114
1115 let (written_res, mut decoded_fut) =
1121 run_while(&mut exec, decoded_fut, decoder.write(next_frame));
1122 assert_eq!(written_res.expect("initial write should succeed"), next_frame.len());
1123
1124 let (waker, write_fut_wake_count) = new_count_waker();
1128 let mut counting_ctx = Context::from_waker(&waker);
1129
1130 let mut wake_count_before_stall = 0;
1131 for frame in chunks {
1132 wake_count_before_stall = write_fut_wake_count.get();
1133 let mut written_fut = decoder.write(&frame);
1134 if written_fut.poll_unpin(&mut counting_ctx).is_pending() {
1135 if write_fut_wake_count.get() != wake_count_before_stall {
1138 continue;
1139 }
1140 break;
1143 }
1144 let mut flush_fut = pin!(decoder.flush());
1146 exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1147 }
1148
1149 let decoded_frame = exec.run_singlethreaded(&mut decoded_fut);
1151 assert_eq!(512, decoded_frame.unwrap().unwrap().len(), "Decoded frame size wrong");
1152
1153 let chunks = sbc_data.as_slice().chunks(SBC_FRAME_SIZE);
1155 for frame in chunks {
1156 wake_count_before_stall = write_fut_wake_count.get();
1157 let mut written_fut = decoder.write(&frame);
1158 if written_fut.poll_unpin(&mut counting_ctx).is_pending() {
1159 if write_fut_wake_count.get() != wake_count_before_stall {
1162 continue;
1163 }
1164 break;
1165 }
1166 let mut flush_fut = pin!(decoder.flush());
1168 exec.run_singlethreaded(&mut flush_fut).expect("to flush the decoder");
1169 }
1170
1171 while write_fut_wake_count.get() == wake_count_before_stall {
1176 let _ = exec.run_until_stalled(&mut futures::future::pending::<()>());
1177 }
1178
1179 }
1182}