Skip to main content

fuchsia_audio_codec/
sysmem_allocator.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use anyhow::{Context as _, Error, format_err};
6use fidl::client::QueryResponseFut;
7use fidl::endpoints::{ClientEnd, Proxy};
8use fidl_fuchsia_sysmem2::{
9    AllocatorAllocateSharedCollectionRequest, AllocatorBindSharedCollectionRequest, AllocatorProxy,
10    AllocatorSetDebugClientInfoRequest, BufferCollectionConstraints, BufferCollectionMarker,
11    BufferCollectionProxy, BufferCollectionSetConstraintsRequest,
12    BufferCollectionTokenDuplicateRequest, BufferCollectionTokenMarker,
13    BufferCollectionWaitForAllBuffersAllocatedResponse,
14    BufferCollectionWaitForAllBuffersAllocatedResult, BufferMemorySettings, NodeSetNameRequest,
15};
16use futures::future::{FusedFuture, Future};
17use futures::task::{Context, Poll};
18use futures::{FutureExt, ready};
19use log::error;
20use std::pin::Pin;
21
22/// A set of buffers that have been allocated with the SysmemAllocator.
23#[derive(Debug)]
24pub struct SysmemAllocatedBuffers {
25    buffers: Vec<zx::Vmo>,
26    settings: BufferMemorySettings,
27    _buffer_collection: BufferCollectionProxy,
28}
29
30#[derive(Debug)]
31pub struct BufferName<'a> {
32    pub name: &'a str,
33    pub priority: u32,
34}
35
36#[derive(Debug)]
37pub struct AllocatorDebugInfo {
38    pub name: String,
39    pub id: u64,
40}
41
42fn default_allocator_name() -> Result<AllocatorDebugInfo, Error> {
43    let name = fuchsia_runtime::process_self().get_name()?;
44    let koid = fuchsia_runtime::process_self().koid()?;
45    Ok(AllocatorDebugInfo { name: name.to_string(), id: koid.raw_koid() })
46}
47
48fn set_allocator_name(
49    sysmem_client: &AllocatorProxy,
50    debug_info: Option<AllocatorDebugInfo>,
51) -> Result<(), Error> {
52    let unwrapped_debug_info = match debug_info {
53        Some(x) => x,
54        None => default_allocator_name()?,
55    };
56    Ok(sysmem_client.set_debug_client_info(&AllocatorSetDebugClientInfoRequest {
57        name: Some(unwrapped_debug_info.name),
58        id: Some(unwrapped_debug_info.id),
59        ..Default::default()
60    })?)
61}
62
63impl SysmemAllocatedBuffers {
64    /// Settings of the buffers that are available through `SysmemAllocator::get`
65    /// Returns None if the buffers are not allocated yet.
66    pub fn settings(&self) -> &BufferMemorySettings {
67        &self.settings
68    }
69
70    /// Get a VMO which has been allocated from the
71    pub fn get_mut(&mut self, idx: u32) -> Option<&mut zx::Vmo> {
72        let idx = idx as usize;
73        return self.buffers.get_mut(idx);
74    }
75
76    /// Get the number of VMOs that have been allocated.
77    /// Returns None if the allocation is not complete yet.
78    pub fn len(&self) -> u32 {
79        self.buffers.len().try_into().expect("buffers should fit in u32")
80    }
81}
82
83/// A Future that communicates with the `fuchsia.sysmem2.Allocator` service to allocate shared
84/// buffers.
85pub enum SysmemAllocation {
86    Pending,
87    /// Waiting for the Sync response from the Allocator
88    WaitingForSync {
89        future: Box<QueryResponseFut<()>>,
90        token_fn: Option<Box<dyn FnOnce() -> () + Send + Sync>>,
91        buffer_collection: BufferCollectionProxy,
92    },
93    /// Waiting for the buffers to be allocated, which should eventually happen after delivering the token.
94    WaitingForAllocation(
95        Box<QueryResponseFut<BufferCollectionWaitForAllBuffersAllocatedResult>>,
96        BufferCollectionProxy,
97    ),
98    /// Allocation is completed. The result here represents whether it completed successfully or an
99    /// error.
100    Done(Result<(), fidl_fuchsia_sysmem2::Error>),
101}
102
103impl SysmemAllocation {
104    /// A pending allocation which has not been started, and will never finish.
105    pub fn pending() -> Self {
106        Self::Pending
107    }
108
109    /// Allocate a new shared memory collection, using `allocator` to communicate with the Allocator
110    /// service. `constraints` will be used to allocate the collection. A shared collection token
111    /// client end will be provided to the `token_target_fn` once the request has been synced with
112    /// the collection. This token can be used with `SysmemAllocation::shared` to finish allocating
113    /// the shared buffers or provided to another service to share allocation, or duplicated to
114    /// share this memory with more than one other client.
115    pub fn allocate<
116        F: FnOnce(ClientEnd<BufferCollectionTokenMarker>) -> () + 'static + Send + Sync,
117    >(
118        allocator: AllocatorProxy,
119        name: BufferName<'_>,
120        debug_info: Option<AllocatorDebugInfo>,
121        constraints: BufferCollectionConstraints,
122        token_target_fn: F,
123    ) -> Result<Self, Error> {
124        // Ignore errors since only debug information is being sent.
125        set_allocator_name(&allocator, debug_info).context("Setting alloocator name")?;
126        let (client_token, client_token_request) =
127            fidl::endpoints::create_proxy::<BufferCollectionTokenMarker>();
128        allocator
129            .allocate_shared_collection(AllocatorAllocateSharedCollectionRequest {
130                token_request: Some(client_token_request),
131                ..Default::default()
132            })
133            .context("Allocating shared collection")?;
134
135        // Duplicate to get another BufferCollectionToken to the same collection.
136        let (token, token_request) = fidl::endpoints::create_endpoints();
137        client_token.duplicate(BufferCollectionTokenDuplicateRequest {
138            rights_attenuation_mask: Some(fidl::Rights::SAME_RIGHTS),
139            token_request: Some(token_request),
140            ..Default::default()
141        })?;
142
143        client_token
144            .set_name(&NodeSetNameRequest {
145                priority: Some(name.priority),
146                name: Some(name.name.to_string()),
147                ..Default::default()
148            })
149            .context("set_name on BufferCollectionToken")?;
150
151        let client_end_token = client_token.into_client_end().unwrap();
152
153        let mut res = Self::bind(allocator, client_end_token, constraints)?;
154
155        if let Self::WaitingForSync { token_fn, .. } = &mut res {
156            *token_fn = Some(Box::new(move || token_target_fn(token)));
157        }
158
159        Ok(res)
160    }
161
162    /// Bind to a shared memory collection, using `allocator` to communicate with the Allocator
163    /// service and a `token` which has already been allocated. `constraints` is set to communicate
164    /// the requirements of this client.
165    pub fn bind(
166        allocator: AllocatorProxy,
167        token: ClientEnd<BufferCollectionTokenMarker>,
168        constraints: BufferCollectionConstraints,
169    ) -> Result<Self, Error> {
170        let (buffer_collection, collection_request) =
171            fidl::endpoints::create_proxy::<BufferCollectionMarker>();
172        allocator.bind_shared_collection(AllocatorBindSharedCollectionRequest {
173            token: Some(token),
174            buffer_collection_request: Some(collection_request),
175            ..Default::default()
176        })?;
177
178        buffer_collection
179            .set_constraints(BufferCollectionSetConstraintsRequest {
180                constraints: Some(constraints),
181                ..Default::default()
182            })
183            .context("sending constraints to sysmem")?;
184
185        Ok(Self::WaitingForSync {
186            future: Box::new(buffer_collection.sync()),
187            token_fn: None,
188            buffer_collection,
189        })
190    }
191
192    /// Advances a synced collection to wait for the allocation of the buffers, after synced.
193    /// Delivers the token to the target as the collection is aware of it now and can reliably
194    /// detect when all tokens have been turned in and constraints have been set.
195    fn synced(&mut self) -> Result<(), Error> {
196        *self = match std::mem::replace(self, Self::Done(Err(fidl_fuchsia_sysmem2::Error::Invalid)))
197        {
198            Self::WaitingForSync { future: _, token_fn, buffer_collection } => {
199                if let Some(deliver_token_fn) = token_fn {
200                    deliver_token_fn();
201                }
202                Self::WaitingForAllocation(
203                    Box::new(buffer_collection.wait_for_all_buffers_allocated()),
204                    buffer_collection,
205                )
206            }
207            _ => Self::Done(Err(fidl_fuchsia_sysmem2::Error::Invalid)),
208        };
209        if let Self::Done(_) = self {
210            return Err(format_err!("bad state in synced"));
211        }
212        Ok(())
213    }
214
215    /// Finish once the allocation has completed.  Returns the buffers and marks the allocation as
216    /// complete.
217    fn allocated(
218        &mut self,
219        response_result: Result<
220            BufferCollectionWaitForAllBuffersAllocatedResponse,
221            fidl_fuchsia_sysmem2::Error,
222        >,
223    ) -> Result<SysmemAllocatedBuffers, Error> {
224        let done_result = response_result.as_ref().map(|_| ()).map_err(|err| *err);
225        match std::mem::replace(self, Self::Done(done_result)) {
226            Self::WaitingForAllocation(_, buffer_collection) => {
227                let response =
228                    response_result.map_err(|err| format_err!("allocation failed: {:?}", err))?;
229                let buffer_info = response.buffer_collection_info.unwrap();
230                let buffers = buffer_info
231                    .buffers
232                    .unwrap()
233                    .iter_mut()
234                    .map(|buffer| buffer.vmo.take().expect("missing buffer"))
235                    .collect();
236                let settings = buffer_info.settings.unwrap().buffer_settings.unwrap();
237                Ok(SysmemAllocatedBuffers {
238                    buffers,
239                    settings,
240                    _buffer_collection: buffer_collection,
241                })
242            }
243            _ => Err(format_err!("allocation complete but not in the right state")),
244        }
245    }
246}
247
248impl FusedFuture for SysmemAllocation {
249    fn is_terminated(&self) -> bool {
250        match self {
251            Self::Done(_) => true,
252            _ => false,
253        }
254    }
255}
256
257impl Future for SysmemAllocation {
258    type Output = Result<SysmemAllocatedBuffers, Error>;
259
260    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
261        let s = Pin::into_inner(self);
262        if let Self::WaitingForSync { future, .. } = s {
263            match ready!(future.poll_unpin(cx)) {
264                Err(e) => {
265                    error!("SysmemAllocator error: {:?}", e);
266                    return Poll::Ready(Err(e.into()));
267                }
268                Ok(()) => {
269                    if let Err(e) = s.synced() {
270                        return Poll::Ready(Err(e));
271                    }
272                }
273            };
274        }
275        if let Self::WaitingForAllocation(future, _) = s {
276            match ready!(future.poll_unpin(cx)) {
277                Ok(response_result) => return Poll::Ready(s.allocated(response_result)),
278                Err(e) => {
279                    error!("SysmemAllocator waiting error: {:?}", e);
280                    Poll::Ready(Err(e.into()))
281                }
282            }
283        } else {
284            Poll::Pending
285        }
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    use fidl_fuchsia_sysmem2::{
294        AllocatorMarker, AllocatorRequest, BufferCollectionInfo, BufferCollectionRequest,
295        BufferCollectionTokenProxy, BufferCollectionTokenRequest,
296        BufferCollectionTokenRequestStream, BufferMemoryConstraints, BufferUsage, CPU_USAGE_READ,
297        CoherencyDomain, Heap, SingleBufferSettings, VIDEO_USAGE_HW_DECODER, VmoBuffer,
298    };
299    use fuchsia_async as fasync;
300    use futures::StreamExt;
301    use std::pin::pin;
302
303    use crate::buffer_collection_constraints::buffer_collection_constraints_default;
304
305    fn assert_tokens_connected(
306        exec: &mut fasync::TestExecutor,
307        proxy: &BufferCollectionTokenProxy,
308        requests: &mut BufferCollectionTokenRequestStream,
309    ) {
310        let mut sync_fut = proxy.sync();
311
312        match exec.run_until_stalled(&mut requests.next()) {
313            Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Sync { responder }))) => {
314                responder.send().expect("respond to sync")
315            }
316            x => panic!("Expected vended token to be connected, got {:?}", x),
317        };
318
319        // The sync future is ready now.
320        assert!(exec.run_until_stalled(&mut sync_fut).is_ready());
321    }
322
323    #[fuchsia::test]
324    fn allocate_future() {
325        let mut exec = fasync::TestExecutor::new();
326
327        let (proxy, mut allocator_requests) =
328            fidl::endpoints::create_proxy_and_stream::<AllocatorMarker>();
329
330        let (sender, mut receiver) = futures::channel::oneshot::channel();
331
332        let token_fn = move |token| {
333            sender.send(token).expect("should be able to send token");
334        };
335
336        let mut allocation = SysmemAllocation::allocate(
337            proxy,
338            BufferName { name: "audio-codec.allocate_future", priority: 100 },
339            None,
340            BufferCollectionConstraints {
341                usage: Some(BufferUsage {
342                    cpu: Some(CPU_USAGE_READ),
343                    video: Some(VIDEO_USAGE_HW_DECODER),
344                    ..Default::default()
345                }),
346                min_buffer_count_for_camping: Some(1),
347                ..Default::default()
348            },
349            token_fn,
350        )
351        .expect("starting should work");
352        match exec.run_until_stalled(&mut allocator_requests.next()) {
353            Poll::Ready(Some(Ok(AllocatorRequest::SetDebugClientInfo { .. }))) => (),
354            x => panic!("Expected debug client info, got {:?}", x),
355        };
356
357        let mut token_requests_1 = match exec.run_until_stalled(&mut allocator_requests.next()) {
358            Poll::Ready(Some(Ok(AllocatorRequest::AllocateSharedCollection {
359                payload, ..
360            }))) => payload.token_request.unwrap().into_stream(),
361            x => panic!("Expected a shared allocation request, got {:?}", x),
362        };
363
364        let mut token_requests_2 = match exec.run_until_stalled(&mut token_requests_1.next()) {
365            Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Duplicate { payload, .. }))) => {
366                payload.token_request.unwrap().into_stream()
367            }
368            x => panic!("Expected a duplication request, got {:?}", x),
369        };
370
371        let (token_client_1, mut collection_requests_1) = match exec
372            .run_until_stalled(&mut allocator_requests.next())
373        {
374            Poll::Ready(Some(Ok(AllocatorRequest::BindSharedCollection { payload, .. }))) => (
375                payload.token.unwrap().into_proxy(),
376                payload.buffer_collection_request.unwrap().into_stream(),
377            ),
378            x => panic!("Expected Bind Shared Collection, got: {:?}", x),
379        };
380
381        match exec.run_until_stalled(&mut token_requests_1.next()) {
382            Poll::Ready(Some(Ok(BufferCollectionTokenRequest::SetName { .. }))) => {}
383            x => panic!("Expected setname {:?}", x),
384        };
385
386        // The token turned into the allocator for binding should be connected to the server on allocating.
387        assert_tokens_connected(&mut exec, &token_client_1, &mut token_requests_1);
388
389        match exec.run_until_stalled(&mut collection_requests_1.next()) {
390            Poll::Ready(Some(Ok(BufferCollectionRequest::SetConstraints { .. }))) => {}
391            x => panic!("Expected buffer constraints request, got {:?}", x),
392        };
393
394        let sync_responder = match exec.run_until_stalled(&mut collection_requests_1.next()) {
395            Poll::Ready(Some(Ok(BufferCollectionRequest::Sync { responder }))) => responder,
396            x => panic!("Expected a sync request, got {:?}", x),
397        };
398
399        // The sysmem allocator is now waiting for the sync from the collection
400
401        assert!(exec.run_until_stalled(&mut allocation).is_pending());
402
403        // When it gets a response that the collection is synced, it vends the token out
404        sync_responder.send().expect("respond to sync request");
405
406        assert!(exec.run_until_stalled(&mut allocation).is_pending());
407
408        let token_client_2 = match receiver.try_recv() {
409            Ok(Some(token)) => token.into_proxy(),
410            x => panic!("Should have a token sent to the fn, got {:?}", x),
411        };
412
413        // token_client_2 should be attached to the token_requests_2 that we handed over to sysmem
414        // (in the token duplicate)
415        assert_tokens_connected(&mut exec, &token_client_2, &mut token_requests_2);
416
417        // We should have received a wait for the buffers to be allocated in our collection
418        const SIZE_BYTES: u64 = 1024;
419        let buffer_settings = BufferMemorySettings {
420            size_bytes: Some(SIZE_BYTES),
421            is_physically_contiguous: Some(true),
422            is_secure: Some(false),
423            coherency_domain: Some(CoherencyDomain::Ram),
424            heap: Some(Heap {
425                heap_type: Some(bind_fuchsia_sysmem_heap::HEAP_TYPE_SYSTEM_RAM.into()),
426                ..Default::default()
427            }),
428            ..Default::default()
429        };
430
431        match exec.run_until_stalled(&mut collection_requests_1.next()) {
432            Poll::Ready(Some(Ok(BufferCollectionRequest::WaitForAllBuffersAllocated {
433                responder,
434            }))) => {
435                let single_buffer_settings = SingleBufferSettings {
436                    buffer_settings: Some(buffer_settings.clone()),
437                    ..Default::default()
438                };
439                let buffer_collection_info = BufferCollectionInfo {
440                    settings: Some(single_buffer_settings),
441                    buffers: Some(vec![VmoBuffer {
442                        vmo: Some(zx::Vmo::create(SIZE_BYTES.into()).expect("vmo creation")),
443                        vmo_usable_start: Some(0),
444                        ..Default::default()
445                    }]),
446                    ..Default::default()
447                };
448                let response = BufferCollectionWaitForAllBuffersAllocatedResponse {
449                    buffer_collection_info: Some(buffer_collection_info),
450                    ..Default::default()
451                };
452                responder.send(Ok(response)).expect("send collection response")
453            }
454            x => panic!("Expected WaitForBuffersAllocated, got {:?}", x),
455        };
456
457        // The allocator should now be finished!
458        let mut buffers = match exec.run_until_stalled(&mut allocation) {
459            Poll::Pending => panic!("allocation should be done"),
460            Poll::Ready(res) => res.expect("successful allocation"),
461        };
462
463        assert_eq!(1, buffers.len());
464        assert!(buffers.get_mut(0).is_some());
465        assert_eq!(buffers.settings(), &buffer_settings);
466    }
467
468    #[fuchsia::test]
469    fn with_system_allocator() {
470        let mut exec = fasync::TestExecutor::new();
471        let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
472            .expect("connect to allocator");
473
474        let buffer_constraints = BufferCollectionConstraints {
475            min_buffer_count: Some(2),
476            buffer_memory_constraints: Some(BufferMemoryConstraints {
477                min_size_bytes: Some(4096),
478                ..Default::default()
479            }),
480            ..buffer_collection_constraints_default()
481        };
482
483        let (sender, mut receiver) = futures::channel::oneshot::channel();
484        let token_fn = move |token| {
485            sender.send(token).expect("should be able to send token");
486        };
487
488        let mut allocation = SysmemAllocation::allocate(
489            sysmem_client.clone(),
490            BufferName { name: "audio-codec.allocate_future", priority: 100 },
491            None,
492            buffer_constraints.clone(),
493            token_fn,
494        )
495        .expect("start allocator");
496
497        // Receive the token.  From here on, using the token, the test becomes the other client to
498        // the Allocator sharing the memory.
499        let token = loop {
500            assert!(exec.run_until_stalled(&mut allocation).is_pending());
501            if let Poll::Ready(x) = exec.run_until_stalled(&mut receiver) {
502                break x;
503            }
504        };
505        let token = token.expect("receive token");
506
507        let (buffer_collection_client, buffer_collection_requests) =
508            fidl::endpoints::create_proxy::<BufferCollectionMarker>();
509        sysmem_client
510            .bind_shared_collection(AllocatorBindSharedCollectionRequest {
511                token: Some(token),
512                buffer_collection_request: Some(buffer_collection_requests),
513                ..Default::default()
514            })
515            .expect("bind okay");
516
517        buffer_collection_client
518            .set_constraints(BufferCollectionSetConstraintsRequest {
519                constraints: Some(buffer_constraints),
520                ..Default::default()
521            })
522            .expect("constraints should send okay");
523
524        let mut allocation_fut = pin!(buffer_collection_client.wait_for_all_buffers_allocated());
525
526        let allocation_result =
527            exec.run_singlethreaded(&mut allocation_fut).expect("allocation success");
528
529        assert!(allocation_result.is_ok());
530
531        // Allocator should be ready now.
532        let allocated_buffers = match exec.run_until_stalled(&mut allocation) {
533            Poll::Ready(bufs) => bufs.expect("allocation success"),
534            x => panic!("Expected ready, got {:?}", x),
535        };
536
537        let _allocator_settings = allocated_buffers.settings();
538
539        let buffers = allocation_result.unwrap().buffer_collection_info.unwrap().buffers.unwrap();
540
541        assert_eq!(buffers.len(), allocated_buffers.len() as usize);
542    }
543}