fuchsia_audio_codec/
sysmem_allocator.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{Context as _, Error, format_err};
6use fidl::client::QueryResponseFut;
7use fidl::endpoints::{ClientEnd, Proxy};
8use fidl_fuchsia_sysmem2::{
9    AllocatorAllocateSharedCollectionRequest, AllocatorBindSharedCollectionRequest, AllocatorProxy,
10    AllocatorSetDebugClientInfoRequest, BufferCollectionConstraints, BufferCollectionMarker,
11    BufferCollectionProxy, BufferCollectionSetConstraintsRequest,
12    BufferCollectionTokenDuplicateRequest, BufferCollectionTokenMarker,
13    BufferCollectionWaitForAllBuffersAllocatedResponse,
14    BufferCollectionWaitForAllBuffersAllocatedResult, BufferMemorySettings, NodeSetNameRequest,
15};
16use futures::future::{FusedFuture, Future};
17use futures::task::{Context, Poll};
18use futures::{FutureExt, ready};
19use log::error;
20use std::pin::Pin;
21
22/// A set of buffers that have been allocated with the SysmemAllocator.
23#[derive(Debug)]
24pub struct SysmemAllocatedBuffers {
25    buffers: Vec<zx::Vmo>,
26    settings: BufferMemorySettings,
27    _buffer_collection: BufferCollectionProxy,
28}
29
30#[derive(Debug)]
31pub struct BufferName<'a> {
32    pub name: &'a str,
33    pub priority: u32,
34}
35
36#[derive(Debug)]
37pub struct AllocatorDebugInfo {
38    pub name: String,
39    pub id: u64,
40}
41
42fn default_allocator_name() -> Result<AllocatorDebugInfo, Error> {
43    let name = fuchsia_runtime::process_self().get_name()?;
44    let koid = fuchsia_runtime::process_self().koid()?;
45    Ok(AllocatorDebugInfo { name: name.to_string(), id: koid.raw_koid() })
46}
47
48fn set_allocator_name(
49    sysmem_client: &AllocatorProxy,
50    debug_info: Option<AllocatorDebugInfo>,
51) -> Result<(), Error> {
52    let unwrapped_debug_info = match debug_info {
53        Some(x) => x,
54        None => default_allocator_name()?,
55    };
56    Ok(sysmem_client.set_debug_client_info(&AllocatorSetDebugClientInfoRequest {
57        name: Some(unwrapped_debug_info.name),
58        id: Some(unwrapped_debug_info.id),
59        ..Default::default()
60    })?)
61}
62
63impl SysmemAllocatedBuffers {
64    /// Settings of the buffers that are available through `SysmemAllocator::get`
65    /// Returns None if the buffers are not allocated yet.
66    pub fn settings(&self) -> &BufferMemorySettings {
67        &self.settings
68    }
69
70    /// Get a VMO which has been allocated from the
71    pub fn get_mut(&mut self, idx: u32) -> Option<&mut zx::Vmo> {
72        let idx = idx as usize;
73        return self.buffers.get_mut(idx);
74    }
75
76    /// Get the number of VMOs that have been allocated.
77    /// Returns None if the allocation is not complete yet.
78    pub fn len(&self) -> u32 {
79        self.buffers.len().try_into().expect("buffers should fit in u32")
80    }
81}
82
83#[allow(clippy::large_enum_variant)] // TODO(https://fxbug.dev/401087115)
84/// A Future that communicates with the `fuchsia.sysmem2.Allocator` service to allocate shared
85/// buffers.
86pub enum SysmemAllocation {
87    Pending,
88    /// Waiting for the Sync response from the Allocator
89    WaitingForSync {
90        future: QueryResponseFut<()>,
91        token_fn: Option<Box<dyn FnOnce() -> () + Send + Sync>>,
92        buffer_collection: BufferCollectionProxy,
93    },
94    /// Waiting for the buffers to be allocated, which should eventually happen after delivering the token.
95    WaitingForAllocation(
96        QueryResponseFut<BufferCollectionWaitForAllBuffersAllocatedResult>,
97        BufferCollectionProxy,
98    ),
99    /// Allocation is completed. The result here represents whether it completed successfully or an
100    /// error.
101    Done(Result<(), fidl_fuchsia_sysmem2::Error>),
102}
103
104impl SysmemAllocation {
105    /// A pending allocation which has not been started, and will never finish.
106    pub fn pending() -> Self {
107        Self::Pending
108    }
109
110    /// Allocate a new shared memory collection, using `allocator` to communicate with the Allocator
111    /// service. `constraints` will be used to allocate the collection. A shared collection token
112    /// client end will be provided to the `token_target_fn` once the request has been synced with
113    /// the collection. This token can be used with `SysmemAllocation::shared` to finish allocating
114    /// the shared buffers or provided to another service to share allocation, or duplicated to
115    /// share this memory with more than one other client.
116    pub fn allocate<
117        F: FnOnce(ClientEnd<BufferCollectionTokenMarker>) -> () + 'static + Send + Sync,
118    >(
119        allocator: AllocatorProxy,
120        name: BufferName<'_>,
121        debug_info: Option<AllocatorDebugInfo>,
122        constraints: BufferCollectionConstraints,
123        token_target_fn: F,
124    ) -> Result<Self, Error> {
125        // Ignore errors since only debug information is being sent.
126        set_allocator_name(&allocator, debug_info).context("Setting alloocator name")?;
127        let (client_token, client_token_request) =
128            fidl::endpoints::create_proxy::<BufferCollectionTokenMarker>();
129        allocator
130            .allocate_shared_collection(AllocatorAllocateSharedCollectionRequest {
131                token_request: Some(client_token_request),
132                ..Default::default()
133            })
134            .context("Allocating shared collection")?;
135
136        // Duplicate to get another BufferCollectionToken to the same collection.
137        let (token, token_request) = fidl::endpoints::create_endpoints();
138        client_token.duplicate(BufferCollectionTokenDuplicateRequest {
139            rights_attenuation_mask: Some(fidl::Rights::SAME_RIGHTS),
140            token_request: Some(token_request),
141            ..Default::default()
142        })?;
143
144        client_token
145            .set_name(&NodeSetNameRequest {
146                priority: Some(name.priority),
147                name: Some(name.name.to_string()),
148                ..Default::default()
149            })
150            .context("set_name on BufferCollectionToken")?;
151
152        let client_end_token = client_token.into_client_end().unwrap();
153
154        let mut res = Self::bind(allocator, client_end_token, constraints)?;
155
156        if let Self::WaitingForSync { token_fn, .. } = &mut res {
157            *token_fn = Some(Box::new(move || token_target_fn(token)));
158        }
159
160        Ok(res)
161    }
162
163    /// Bind to a shared memory collection, using `allocator` to communicate with the Allocator
164    /// service and a `token` which has already been allocated. `constraints` is set to communicate
165    /// the requirements of this client.
166    pub fn bind(
167        allocator: AllocatorProxy,
168        token: ClientEnd<BufferCollectionTokenMarker>,
169        constraints: BufferCollectionConstraints,
170    ) -> Result<Self, Error> {
171        let (buffer_collection, collection_request) =
172            fidl::endpoints::create_proxy::<BufferCollectionMarker>();
173        allocator.bind_shared_collection(AllocatorBindSharedCollectionRequest {
174            token: Some(token),
175            buffer_collection_request: Some(collection_request),
176            ..Default::default()
177        })?;
178
179        buffer_collection
180            .set_constraints(BufferCollectionSetConstraintsRequest {
181                constraints: Some(constraints),
182                ..Default::default()
183            })
184            .context("sending constraints to sysmem")?;
185
186        Ok(Self::WaitingForSync {
187            future: buffer_collection.sync(),
188            token_fn: None,
189            buffer_collection,
190        })
191    }
192
193    /// Advances a synced collection to wait for the allocation of the buffers, after synced.
194    /// Delivers the token to the target as the collection is aware of it now and can reliably
195    /// detect when all tokens have been turned in and constraints have been set.
196    fn synced(&mut self) -> Result<(), Error> {
197        *self = match std::mem::replace(self, Self::Done(Err(fidl_fuchsia_sysmem2::Error::Invalid)))
198        {
199            Self::WaitingForSync { future: _, token_fn, buffer_collection } => {
200                if let Some(deliver_token_fn) = token_fn {
201                    deliver_token_fn();
202                }
203                Self::WaitingForAllocation(
204                    buffer_collection.wait_for_all_buffers_allocated(),
205                    buffer_collection,
206                )
207            }
208            _ => Self::Done(Err(fidl_fuchsia_sysmem2::Error::Invalid)),
209        };
210        if let Self::Done(_) = self {
211            return Err(format_err!("bad state in synced"));
212        }
213        Ok(())
214    }
215
216    /// Finish once the allocation has completed.  Returns the buffers and marks the allocation as
217    /// complete.
218    fn allocated(
219        &mut self,
220        response_result: Result<
221            BufferCollectionWaitForAllBuffersAllocatedResponse,
222            fidl_fuchsia_sysmem2::Error,
223        >,
224    ) -> Result<SysmemAllocatedBuffers, Error> {
225        let done_result = response_result.as_ref().map(|_| ()).map_err(|err| *err);
226        match std::mem::replace(self, Self::Done(done_result)) {
227            Self::WaitingForAllocation(_, buffer_collection) => {
228                let response =
229                    response_result.map_err(|err| format_err!("allocation failed: {:?}", err))?;
230                let buffer_info = response.buffer_collection_info.unwrap();
231                let buffers = buffer_info
232                    .buffers
233                    .unwrap()
234                    .iter_mut()
235                    .map(|buffer| buffer.vmo.take().expect("missing buffer"))
236                    .collect();
237                let settings = buffer_info.settings.unwrap().buffer_settings.unwrap();
238                Ok(SysmemAllocatedBuffers {
239                    buffers,
240                    settings,
241                    _buffer_collection: buffer_collection,
242                })
243            }
244            _ => Err(format_err!("allocation complete but not in the right state")),
245        }
246    }
247}
248
249impl FusedFuture for SysmemAllocation {
250    fn is_terminated(&self) -> bool {
251        match self {
252            Self::Done(_) => true,
253            _ => false,
254        }
255    }
256}
257
258impl Future for SysmemAllocation {
259    type Output = Result<SysmemAllocatedBuffers, Error>;
260
261    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
262        let s = Pin::into_inner(self);
263        if let Self::WaitingForSync { future, .. } = s {
264            match ready!(future.poll_unpin(cx)) {
265                Err(e) => {
266                    error!("SysmemAllocator error: {:?}", e);
267                    return Poll::Ready(Err(e.into()));
268                }
269                Ok(()) => {
270                    if let Err(e) = s.synced() {
271                        return Poll::Ready(Err(e));
272                    }
273                }
274            };
275        }
276        if let Self::WaitingForAllocation(future, _) = s {
277            match ready!(future.poll_unpin(cx)) {
278                Ok(response_result) => return Poll::Ready(s.allocated(response_result)),
279                Err(e) => {
280                    error!("SysmemAllocator waiting error: {:?}", e);
281                    Poll::Ready(Err(e.into()))
282                }
283            }
284        } else {
285            Poll::Pending
286        }
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293
294    use fidl_fuchsia_sysmem2::{
295        AllocatorMarker, AllocatorRequest, BufferCollectionInfo, BufferCollectionRequest,
296        BufferCollectionTokenProxy, BufferCollectionTokenRequest,
297        BufferCollectionTokenRequestStream, BufferMemoryConstraints, BufferUsage, CPU_USAGE_READ,
298        CoherencyDomain, Heap, SingleBufferSettings, VIDEO_USAGE_HW_DECODER, VmoBuffer,
299    };
300    use fuchsia_async as fasync;
301    use futures::StreamExt;
302    use std::pin::pin;
303
304    use crate::buffer_collection_constraints::buffer_collection_constraints_default;
305
306    fn assert_tokens_connected(
307        exec: &mut fasync::TestExecutor,
308        proxy: &BufferCollectionTokenProxy,
309        requests: &mut BufferCollectionTokenRequestStream,
310    ) {
311        let mut sync_fut = proxy.sync();
312
313        match exec.run_until_stalled(&mut requests.next()) {
314            Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Sync { responder }))) => {
315                responder.send().expect("respond to sync")
316            }
317            x => panic!("Expected vended token to be connected, got {:?}", x),
318        };
319
320        // The sync future is ready now.
321        assert!(exec.run_until_stalled(&mut sync_fut).is_ready());
322    }
323
324    #[fuchsia::test]
325    fn allocate_future() {
326        let mut exec = fasync::TestExecutor::new();
327
328        let (proxy, mut allocator_requests) =
329            fidl::endpoints::create_proxy_and_stream::<AllocatorMarker>();
330
331        let (sender, mut receiver) = futures::channel::oneshot::channel();
332
333        let token_fn = move |token| {
334            sender.send(token).expect("should be able to send token");
335        };
336
337        let mut allocation = SysmemAllocation::allocate(
338            proxy,
339            BufferName { name: "audio-codec.allocate_future", priority: 100 },
340            None,
341            BufferCollectionConstraints {
342                usage: Some(BufferUsage {
343                    cpu: Some(CPU_USAGE_READ),
344                    video: Some(VIDEO_USAGE_HW_DECODER),
345                    ..Default::default()
346                }),
347                min_buffer_count_for_camping: Some(1),
348                ..Default::default()
349            },
350            token_fn,
351        )
352        .expect("starting should work");
353        match exec.run_until_stalled(&mut allocator_requests.next()) {
354            Poll::Ready(Some(Ok(AllocatorRequest::SetDebugClientInfo { .. }))) => (),
355            x => panic!("Expected debug client info, got {:?}", x),
356        };
357
358        let mut token_requests_1 = match exec.run_until_stalled(&mut allocator_requests.next()) {
359            Poll::Ready(Some(Ok(AllocatorRequest::AllocateSharedCollection {
360                payload, ..
361            }))) => payload.token_request.unwrap().into_stream(),
362            x => panic!("Expected a shared allocation request, got {:?}", x),
363        };
364
365        let mut token_requests_2 = match exec.run_until_stalled(&mut token_requests_1.next()) {
366            Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Duplicate { payload, .. }))) => {
367                payload.token_request.unwrap().into_stream()
368            }
369            x => panic!("Expected a duplication request, got {:?}", x),
370        };
371
372        let (token_client_1, mut collection_requests_1) = match exec
373            .run_until_stalled(&mut allocator_requests.next())
374        {
375            Poll::Ready(Some(Ok(AllocatorRequest::BindSharedCollection { payload, .. }))) => (
376                payload.token.unwrap().into_proxy(),
377                payload.buffer_collection_request.unwrap().into_stream(),
378            ),
379            x => panic!("Expected Bind Shared Collection, got: {:?}", x),
380        };
381
382        match exec.run_until_stalled(&mut token_requests_1.next()) {
383            Poll::Ready(Some(Ok(BufferCollectionTokenRequest::SetName { .. }))) => {}
384            x => panic!("Expected setname {:?}", x),
385        };
386
387        // The token turned into the allocator for binding should be connected to the server on allocating.
388        assert_tokens_connected(&mut exec, &token_client_1, &mut token_requests_1);
389
390        match exec.run_until_stalled(&mut collection_requests_1.next()) {
391            Poll::Ready(Some(Ok(BufferCollectionRequest::SetConstraints { .. }))) => {}
392            x => panic!("Expected buffer constraints request, got {:?}", x),
393        };
394
395        let sync_responder = match exec.run_until_stalled(&mut collection_requests_1.next()) {
396            Poll::Ready(Some(Ok(BufferCollectionRequest::Sync { responder }))) => responder,
397            x => panic!("Expected a sync request, got {:?}", x),
398        };
399
400        // The sysmem allocator is now waiting for the sync from the collection
401
402        assert!(exec.run_until_stalled(&mut allocation).is_pending());
403
404        // When it gets a response that the collection is synced, it vends the token out
405        sync_responder.send().expect("respond to sync request");
406
407        assert!(exec.run_until_stalled(&mut allocation).is_pending());
408
409        let token_client_2 = match receiver.try_recv() {
410            Ok(Some(token)) => token.into_proxy(),
411            x => panic!("Should have a token sent to the fn, got {:?}", x),
412        };
413
414        // token_client_2 should be attached to the token_requests_2 that we handed over to sysmem
415        // (in the token duplicate)
416        assert_tokens_connected(&mut exec, &token_client_2, &mut token_requests_2);
417
418        // We should have received a wait for the buffers to be allocated in our collection
419        const SIZE_BYTES: u64 = 1024;
420        let buffer_settings = BufferMemorySettings {
421            size_bytes: Some(SIZE_BYTES),
422            is_physically_contiguous: Some(true),
423            is_secure: Some(false),
424            coherency_domain: Some(CoherencyDomain::Ram),
425            heap: Some(Heap {
426                heap_type: Some(bind_fuchsia_sysmem_heap::HEAP_TYPE_SYSTEM_RAM.into()),
427                ..Default::default()
428            }),
429            ..Default::default()
430        };
431
432        match exec.run_until_stalled(&mut collection_requests_1.next()) {
433            Poll::Ready(Some(Ok(BufferCollectionRequest::WaitForAllBuffersAllocated {
434                responder,
435            }))) => {
436                let single_buffer_settings = SingleBufferSettings {
437                    buffer_settings: Some(buffer_settings.clone()),
438                    ..Default::default()
439                };
440                let buffer_collection_info = BufferCollectionInfo {
441                    settings: Some(single_buffer_settings),
442                    buffers: Some(vec![VmoBuffer {
443                        vmo: Some(zx::Vmo::create(SIZE_BYTES.into()).expect("vmo creation")),
444                        vmo_usable_start: Some(0),
445                        ..Default::default()
446                    }]),
447                    ..Default::default()
448                };
449                let response = BufferCollectionWaitForAllBuffersAllocatedResponse {
450                    buffer_collection_info: Some(buffer_collection_info),
451                    ..Default::default()
452                };
453                responder.send(Ok(response)).expect("send collection response")
454            }
455            x => panic!("Expected WaitForBuffersAllocated, got {:?}", x),
456        };
457
458        // The allocator should now be finished!
459        let mut buffers = match exec.run_until_stalled(&mut allocation) {
460            Poll::Pending => panic!("allocation should be done"),
461            Poll::Ready(res) => res.expect("successful allocation"),
462        };
463
464        assert_eq!(1, buffers.len());
465        assert!(buffers.get_mut(0).is_some());
466        assert_eq!(buffers.settings(), &buffer_settings);
467    }
468
469    #[fuchsia::test]
470    fn with_system_allocator() {
471        let mut exec = fasync::TestExecutor::new();
472        let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
473            .expect("connect to allocator");
474
475        let buffer_constraints = BufferCollectionConstraints {
476            min_buffer_count: Some(2),
477            buffer_memory_constraints: Some(BufferMemoryConstraints {
478                min_size_bytes: Some(4096),
479                ..Default::default()
480            }),
481            ..buffer_collection_constraints_default()
482        };
483
484        let (sender, mut receiver) = futures::channel::oneshot::channel();
485        let token_fn = move |token| {
486            sender.send(token).expect("should be able to send token");
487        };
488
489        let mut allocation = SysmemAllocation::allocate(
490            sysmem_client.clone(),
491            BufferName { name: "audio-codec.allocate_future", priority: 100 },
492            None,
493            buffer_constraints.clone(),
494            token_fn,
495        )
496        .expect("start allocator");
497
498        // Receive the token.  From here on, using the token, the test becomes the other client to
499        // the Allocator sharing the memory.
500        let token = loop {
501            assert!(exec.run_until_stalled(&mut allocation).is_pending());
502            if let Poll::Ready(x) = exec.run_until_stalled(&mut receiver) {
503                break x;
504            }
505        };
506        let token = token.expect("receive token");
507
508        let (buffer_collection_client, buffer_collection_requests) =
509            fidl::endpoints::create_proxy::<BufferCollectionMarker>();
510        sysmem_client
511            .bind_shared_collection(AllocatorBindSharedCollectionRequest {
512                token: Some(token),
513                buffer_collection_request: Some(buffer_collection_requests),
514                ..Default::default()
515            })
516            .expect("bind okay");
517
518        buffer_collection_client
519            .set_constraints(BufferCollectionSetConstraintsRequest {
520                constraints: Some(buffer_constraints),
521                ..Default::default()
522            })
523            .expect("constraints should send okay");
524
525        let mut allocation_fut = pin!(buffer_collection_client.wait_for_all_buffers_allocated());
526
527        let allocation_result =
528            exec.run_singlethreaded(&mut allocation_fut).expect("allocation success");
529
530        assert!(allocation_result.is_ok());
531
532        // Allocator should be ready now.
533        let allocated_buffers = match exec.run_until_stalled(&mut allocation) {
534            Poll::Ready(bufs) => bufs.expect("allocation success"),
535            x => panic!("Expected ready, got {:?}", x),
536        };
537
538        let _allocator_settings = allocated_buffers.settings();
539
540        let buffers = allocation_result.unwrap().buffer_collection_info.unwrap().buffers.unwrap();
541
542        assert_eq!(buffers.len(), allocated_buffers.len() as usize);
543    }
544}