1use anyhow::{Context as _, Error, format_err};
6use fidl::client::QueryResponseFut;
7use fidl::endpoints::{ClientEnd, Proxy};
8use fidl_fuchsia_sysmem2::{
9 AllocatorAllocateSharedCollectionRequest, AllocatorBindSharedCollectionRequest, AllocatorProxy,
10 AllocatorSetDebugClientInfoRequest, BufferCollectionConstraints, BufferCollectionMarker,
11 BufferCollectionProxy, BufferCollectionSetConstraintsRequest,
12 BufferCollectionTokenDuplicateRequest, BufferCollectionTokenMarker,
13 BufferCollectionWaitForAllBuffersAllocatedResponse,
14 BufferCollectionWaitForAllBuffersAllocatedResult, BufferMemorySettings, NodeSetNameRequest,
15};
16use futures::future::{FusedFuture, Future};
17use futures::task::{Context, Poll};
18use futures::{FutureExt, ready};
19use log::error;
20use std::pin::Pin;
21
22#[derive(Debug)]
24pub struct SysmemAllocatedBuffers {
25 buffers: Vec<zx::Vmo>,
26 settings: BufferMemorySettings,
27 _buffer_collection: BufferCollectionProxy,
28}
29
30#[derive(Debug)]
31pub struct BufferName<'a> {
32 pub name: &'a str,
33 pub priority: u32,
34}
35
36#[derive(Debug)]
37pub struct AllocatorDebugInfo {
38 pub name: String,
39 pub id: u64,
40}
41
42fn default_allocator_name() -> Result<AllocatorDebugInfo, Error> {
43 let name = fuchsia_runtime::process_self().get_name()?;
44 let koid = fuchsia_runtime::process_self().koid()?;
45 Ok(AllocatorDebugInfo { name: name.to_string(), id: koid.raw_koid() })
46}
47
48fn set_allocator_name(
49 sysmem_client: &AllocatorProxy,
50 debug_info: Option<AllocatorDebugInfo>,
51) -> Result<(), Error> {
52 let unwrapped_debug_info = match debug_info {
53 Some(x) => x,
54 None => default_allocator_name()?,
55 };
56 Ok(sysmem_client.set_debug_client_info(&AllocatorSetDebugClientInfoRequest {
57 name: Some(unwrapped_debug_info.name),
58 id: Some(unwrapped_debug_info.id),
59 ..Default::default()
60 })?)
61}
62
63impl SysmemAllocatedBuffers {
64 pub fn settings(&self) -> &BufferMemorySettings {
67 &self.settings
68 }
69
70 pub fn get_mut(&mut self, idx: u32) -> Option<&mut zx::Vmo> {
72 let idx = idx as usize;
73 return self.buffers.get_mut(idx);
74 }
75
76 pub fn len(&self) -> u32 {
79 self.buffers.len().try_into().expect("buffers should fit in u32")
80 }
81}
82
83#[allow(clippy::large_enum_variant)] pub enum SysmemAllocation {
87 Pending,
88 WaitingForSync {
90 future: QueryResponseFut<()>,
91 token_fn: Option<Box<dyn FnOnce() -> () + Send + Sync>>,
92 buffer_collection: BufferCollectionProxy,
93 },
94 WaitingForAllocation(
96 QueryResponseFut<BufferCollectionWaitForAllBuffersAllocatedResult>,
97 BufferCollectionProxy,
98 ),
99 Done(Result<(), fidl_fuchsia_sysmem2::Error>),
102}
103
104impl SysmemAllocation {
105 pub fn pending() -> Self {
107 Self::Pending
108 }
109
110 pub fn allocate<
117 F: FnOnce(ClientEnd<BufferCollectionTokenMarker>) -> () + 'static + Send + Sync,
118 >(
119 allocator: AllocatorProxy,
120 name: BufferName<'_>,
121 debug_info: Option<AllocatorDebugInfo>,
122 constraints: BufferCollectionConstraints,
123 token_target_fn: F,
124 ) -> Result<Self, Error> {
125 set_allocator_name(&allocator, debug_info).context("Setting alloocator name")?;
127 let (client_token, client_token_request) =
128 fidl::endpoints::create_proxy::<BufferCollectionTokenMarker>();
129 allocator
130 .allocate_shared_collection(AllocatorAllocateSharedCollectionRequest {
131 token_request: Some(client_token_request),
132 ..Default::default()
133 })
134 .context("Allocating shared collection")?;
135
136 let (token, token_request) = fidl::endpoints::create_endpoints();
138 client_token.duplicate(BufferCollectionTokenDuplicateRequest {
139 rights_attenuation_mask: Some(fidl::Rights::SAME_RIGHTS),
140 token_request: Some(token_request),
141 ..Default::default()
142 })?;
143
144 client_token
145 .set_name(&NodeSetNameRequest {
146 priority: Some(name.priority),
147 name: Some(name.name.to_string()),
148 ..Default::default()
149 })
150 .context("set_name on BufferCollectionToken")?;
151
152 let client_end_token = client_token.into_client_end().unwrap();
153
154 let mut res = Self::bind(allocator, client_end_token, constraints)?;
155
156 if let Self::WaitingForSync { token_fn, .. } = &mut res {
157 *token_fn = Some(Box::new(move || token_target_fn(token)));
158 }
159
160 Ok(res)
161 }
162
163 pub fn bind(
167 allocator: AllocatorProxy,
168 token: ClientEnd<BufferCollectionTokenMarker>,
169 constraints: BufferCollectionConstraints,
170 ) -> Result<Self, Error> {
171 let (buffer_collection, collection_request) =
172 fidl::endpoints::create_proxy::<BufferCollectionMarker>();
173 allocator.bind_shared_collection(AllocatorBindSharedCollectionRequest {
174 token: Some(token),
175 buffer_collection_request: Some(collection_request),
176 ..Default::default()
177 })?;
178
179 buffer_collection
180 .set_constraints(BufferCollectionSetConstraintsRequest {
181 constraints: Some(constraints),
182 ..Default::default()
183 })
184 .context("sending constraints to sysmem")?;
185
186 Ok(Self::WaitingForSync {
187 future: buffer_collection.sync(),
188 token_fn: None,
189 buffer_collection,
190 })
191 }
192
193 fn synced(&mut self) -> Result<(), Error> {
197 *self = match std::mem::replace(self, Self::Done(Err(fidl_fuchsia_sysmem2::Error::Invalid)))
198 {
199 Self::WaitingForSync { future: _, token_fn, buffer_collection } => {
200 if let Some(deliver_token_fn) = token_fn {
201 deliver_token_fn();
202 }
203 Self::WaitingForAllocation(
204 buffer_collection.wait_for_all_buffers_allocated(),
205 buffer_collection,
206 )
207 }
208 _ => Self::Done(Err(fidl_fuchsia_sysmem2::Error::Invalid)),
209 };
210 if let Self::Done(_) = self {
211 return Err(format_err!("bad state in synced"));
212 }
213 Ok(())
214 }
215
216 fn allocated(
219 &mut self,
220 response_result: Result<
221 BufferCollectionWaitForAllBuffersAllocatedResponse,
222 fidl_fuchsia_sysmem2::Error,
223 >,
224 ) -> Result<SysmemAllocatedBuffers, Error> {
225 let done_result = response_result.as_ref().map(|_| ()).map_err(|err| *err);
226 match std::mem::replace(self, Self::Done(done_result)) {
227 Self::WaitingForAllocation(_, buffer_collection) => {
228 let response =
229 response_result.map_err(|err| format_err!("allocation failed: {:?}", err))?;
230 let buffer_info = response.buffer_collection_info.unwrap();
231 let buffers = buffer_info
232 .buffers
233 .unwrap()
234 .iter_mut()
235 .map(|buffer| buffer.vmo.take().expect("missing buffer"))
236 .collect();
237 let settings = buffer_info.settings.unwrap().buffer_settings.unwrap();
238 Ok(SysmemAllocatedBuffers {
239 buffers,
240 settings,
241 _buffer_collection: buffer_collection,
242 })
243 }
244 _ => Err(format_err!("allocation complete but not in the right state")),
245 }
246 }
247}
248
249impl FusedFuture for SysmemAllocation {
250 fn is_terminated(&self) -> bool {
251 match self {
252 Self::Done(_) => true,
253 _ => false,
254 }
255 }
256}
257
258impl Future for SysmemAllocation {
259 type Output = Result<SysmemAllocatedBuffers, Error>;
260
261 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
262 let s = Pin::into_inner(self);
263 if let Self::WaitingForSync { future, .. } = s {
264 match ready!(future.poll_unpin(cx)) {
265 Err(e) => {
266 error!("SysmemAllocator error: {:?}", e);
267 return Poll::Ready(Err(e.into()));
268 }
269 Ok(()) => {
270 if let Err(e) = s.synced() {
271 return Poll::Ready(Err(e));
272 }
273 }
274 };
275 }
276 if let Self::WaitingForAllocation(future, _) = s {
277 match ready!(future.poll_unpin(cx)) {
278 Ok(response_result) => return Poll::Ready(s.allocated(response_result)),
279 Err(e) => {
280 error!("SysmemAllocator waiting error: {:?}", e);
281 Poll::Ready(Err(e.into()))
282 }
283 }
284 } else {
285 Poll::Pending
286 }
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293
294 use fidl_fuchsia_sysmem2::{
295 AllocatorMarker, AllocatorRequest, BufferCollectionInfo, BufferCollectionRequest,
296 BufferCollectionTokenProxy, BufferCollectionTokenRequest,
297 BufferCollectionTokenRequestStream, BufferMemoryConstraints, BufferUsage, CPU_USAGE_READ,
298 CoherencyDomain, Heap, SingleBufferSettings, VIDEO_USAGE_HW_DECODER, VmoBuffer,
299 };
300 use fuchsia_async as fasync;
301 use futures::StreamExt;
302 use std::pin::pin;
303
304 use crate::buffer_collection_constraints::buffer_collection_constraints_default;
305
306 fn assert_tokens_connected(
307 exec: &mut fasync::TestExecutor,
308 proxy: &BufferCollectionTokenProxy,
309 requests: &mut BufferCollectionTokenRequestStream,
310 ) {
311 let mut sync_fut = proxy.sync();
312
313 match exec.run_until_stalled(&mut requests.next()) {
314 Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Sync { responder }))) => {
315 responder.send().expect("respond to sync")
316 }
317 x => panic!("Expected vended token to be connected, got {:?}", x),
318 };
319
320 assert!(exec.run_until_stalled(&mut sync_fut).is_ready());
322 }
323
324 #[fuchsia::test]
325 fn allocate_future() {
326 let mut exec = fasync::TestExecutor::new();
327
328 let (proxy, mut allocator_requests) =
329 fidl::endpoints::create_proxy_and_stream::<AllocatorMarker>();
330
331 let (sender, mut receiver) = futures::channel::oneshot::channel();
332
333 let token_fn = move |token| {
334 sender.send(token).expect("should be able to send token");
335 };
336
337 let mut allocation = SysmemAllocation::allocate(
338 proxy,
339 BufferName { name: "audio-codec.allocate_future", priority: 100 },
340 None,
341 BufferCollectionConstraints {
342 usage: Some(BufferUsage {
343 cpu: Some(CPU_USAGE_READ),
344 video: Some(VIDEO_USAGE_HW_DECODER),
345 ..Default::default()
346 }),
347 min_buffer_count_for_camping: Some(1),
348 ..Default::default()
349 },
350 token_fn,
351 )
352 .expect("starting should work");
353 match exec.run_until_stalled(&mut allocator_requests.next()) {
354 Poll::Ready(Some(Ok(AllocatorRequest::SetDebugClientInfo { .. }))) => (),
355 x => panic!("Expected debug client info, got {:?}", x),
356 };
357
358 let mut token_requests_1 = match exec.run_until_stalled(&mut allocator_requests.next()) {
359 Poll::Ready(Some(Ok(AllocatorRequest::AllocateSharedCollection {
360 payload, ..
361 }))) => payload.token_request.unwrap().into_stream(),
362 x => panic!("Expected a shared allocation request, got {:?}", x),
363 };
364
365 let mut token_requests_2 = match exec.run_until_stalled(&mut token_requests_1.next()) {
366 Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Duplicate { payload, .. }))) => {
367 payload.token_request.unwrap().into_stream()
368 }
369 x => panic!("Expected a duplication request, got {:?}", x),
370 };
371
372 let (token_client_1, mut collection_requests_1) = match exec
373 .run_until_stalled(&mut allocator_requests.next())
374 {
375 Poll::Ready(Some(Ok(AllocatorRequest::BindSharedCollection { payload, .. }))) => (
376 payload.token.unwrap().into_proxy(),
377 payload.buffer_collection_request.unwrap().into_stream(),
378 ),
379 x => panic!("Expected Bind Shared Collection, got: {:?}", x),
380 };
381
382 match exec.run_until_stalled(&mut token_requests_1.next()) {
383 Poll::Ready(Some(Ok(BufferCollectionTokenRequest::SetName { .. }))) => {}
384 x => panic!("Expected setname {:?}", x),
385 };
386
387 assert_tokens_connected(&mut exec, &token_client_1, &mut token_requests_1);
389
390 match exec.run_until_stalled(&mut collection_requests_1.next()) {
391 Poll::Ready(Some(Ok(BufferCollectionRequest::SetConstraints { .. }))) => {}
392 x => panic!("Expected buffer constraints request, got {:?}", x),
393 };
394
395 let sync_responder = match exec.run_until_stalled(&mut collection_requests_1.next()) {
396 Poll::Ready(Some(Ok(BufferCollectionRequest::Sync { responder }))) => responder,
397 x => panic!("Expected a sync request, got {:?}", x),
398 };
399
400 assert!(exec.run_until_stalled(&mut allocation).is_pending());
403
404 sync_responder.send().expect("respond to sync request");
406
407 assert!(exec.run_until_stalled(&mut allocation).is_pending());
408
409 let token_client_2 = match receiver.try_recv() {
410 Ok(Some(token)) => token.into_proxy(),
411 x => panic!("Should have a token sent to the fn, got {:?}", x),
412 };
413
414 assert_tokens_connected(&mut exec, &token_client_2, &mut token_requests_2);
417
418 const SIZE_BYTES: u64 = 1024;
420 let buffer_settings = BufferMemorySettings {
421 size_bytes: Some(SIZE_BYTES),
422 is_physically_contiguous: Some(true),
423 is_secure: Some(false),
424 coherency_domain: Some(CoherencyDomain::Ram),
425 heap: Some(Heap {
426 heap_type: Some(bind_fuchsia_sysmem_heap::HEAP_TYPE_SYSTEM_RAM.into()),
427 ..Default::default()
428 }),
429 ..Default::default()
430 };
431
432 match exec.run_until_stalled(&mut collection_requests_1.next()) {
433 Poll::Ready(Some(Ok(BufferCollectionRequest::WaitForAllBuffersAllocated {
434 responder,
435 }))) => {
436 let single_buffer_settings = SingleBufferSettings {
437 buffer_settings: Some(buffer_settings.clone()),
438 ..Default::default()
439 };
440 let buffer_collection_info = BufferCollectionInfo {
441 settings: Some(single_buffer_settings),
442 buffers: Some(vec![VmoBuffer {
443 vmo: Some(zx::Vmo::create(SIZE_BYTES.into()).expect("vmo creation")),
444 vmo_usable_start: Some(0),
445 ..Default::default()
446 }]),
447 ..Default::default()
448 };
449 let response = BufferCollectionWaitForAllBuffersAllocatedResponse {
450 buffer_collection_info: Some(buffer_collection_info),
451 ..Default::default()
452 };
453 responder.send(Ok(response)).expect("send collection response")
454 }
455 x => panic!("Expected WaitForBuffersAllocated, got {:?}", x),
456 };
457
458 let mut buffers = match exec.run_until_stalled(&mut allocation) {
460 Poll::Pending => panic!("allocation should be done"),
461 Poll::Ready(res) => res.expect("successful allocation"),
462 };
463
464 assert_eq!(1, buffers.len());
465 assert!(buffers.get_mut(0).is_some());
466 assert_eq!(buffers.settings(), &buffer_settings);
467 }
468
469 #[fuchsia::test]
470 fn with_system_allocator() {
471 let mut exec = fasync::TestExecutor::new();
472 let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
473 .expect("connect to allocator");
474
475 let buffer_constraints = BufferCollectionConstraints {
476 min_buffer_count: Some(2),
477 buffer_memory_constraints: Some(BufferMemoryConstraints {
478 min_size_bytes: Some(4096),
479 ..Default::default()
480 }),
481 ..buffer_collection_constraints_default()
482 };
483
484 let (sender, mut receiver) = futures::channel::oneshot::channel();
485 let token_fn = move |token| {
486 sender.send(token).expect("should be able to send token");
487 };
488
489 let mut allocation = SysmemAllocation::allocate(
490 sysmem_client.clone(),
491 BufferName { name: "audio-codec.allocate_future", priority: 100 },
492 None,
493 buffer_constraints.clone(),
494 token_fn,
495 )
496 .expect("start allocator");
497
498 let token = loop {
501 assert!(exec.run_until_stalled(&mut allocation).is_pending());
502 if let Poll::Ready(x) = exec.run_until_stalled(&mut receiver) {
503 break x;
504 }
505 };
506 let token = token.expect("receive token");
507
508 let (buffer_collection_client, buffer_collection_requests) =
509 fidl::endpoints::create_proxy::<BufferCollectionMarker>();
510 sysmem_client
511 .bind_shared_collection(AllocatorBindSharedCollectionRequest {
512 token: Some(token),
513 buffer_collection_request: Some(buffer_collection_requests),
514 ..Default::default()
515 })
516 .expect("bind okay");
517
518 buffer_collection_client
519 .set_constraints(BufferCollectionSetConstraintsRequest {
520 constraints: Some(buffer_constraints),
521 ..Default::default()
522 })
523 .expect("constraints should send okay");
524
525 let mut allocation_fut = pin!(buffer_collection_client.wait_for_all_buffers_allocated());
526
527 let allocation_result =
528 exec.run_singlethreaded(&mut allocation_fut).expect("allocation success");
529
530 assert!(allocation_result.is_ok());
531
532 let allocated_buffers = match exec.run_until_stalled(&mut allocation) {
534 Poll::Ready(bufs) => bufs.expect("allocation success"),
535 x => panic!("Expected ready, got {:?}", x),
536 };
537
538 let _allocator_settings = allocated_buffers.settings();
539
540 let buffers = allocation_result.unwrap().buffer_collection_info.unwrap().buffers.unwrap();
541
542 assert_eq!(buffers.len(), allocated_buffers.len() as usize);
543 }
544}