1use anyhow::{Context as _, Error, format_err};
6use fidl::client::QueryResponseFut;
7use fidl::endpoints::{ClientEnd, Proxy};
8use fidl_fuchsia_sysmem2::{
9 AllocatorAllocateSharedCollectionRequest, AllocatorBindSharedCollectionRequest, AllocatorProxy,
10 AllocatorSetDebugClientInfoRequest, BufferCollectionConstraints, BufferCollectionMarker,
11 BufferCollectionProxy, BufferCollectionSetConstraintsRequest,
12 BufferCollectionTokenDuplicateRequest, BufferCollectionTokenMarker,
13 BufferCollectionWaitForAllBuffersAllocatedResponse,
14 BufferCollectionWaitForAllBuffersAllocatedResult, BufferMemorySettings, NodeSetNameRequest,
15};
16use futures::future::{FusedFuture, Future};
17use futures::task::{Context, Poll};
18use futures::{FutureExt, ready};
19use log::error;
20use std::pin::Pin;
21
22#[derive(Debug)]
24pub struct SysmemAllocatedBuffers {
25 buffers: Vec<zx::Vmo>,
26 settings: BufferMemorySettings,
27 _buffer_collection: BufferCollectionProxy,
28}
29
30#[derive(Debug)]
31pub struct BufferName<'a> {
32 pub name: &'a str,
33 pub priority: u32,
34}
35
36#[derive(Debug)]
37pub struct AllocatorDebugInfo {
38 pub name: String,
39 pub id: u64,
40}
41
42fn default_allocator_name() -> Result<AllocatorDebugInfo, Error> {
43 let name = fuchsia_runtime::process_self().get_name()?;
44 let koid = fuchsia_runtime::process_self().koid()?;
45 Ok(AllocatorDebugInfo { name: name.to_string(), id: koid.raw_koid() })
46}
47
48fn set_allocator_name(
49 sysmem_client: &AllocatorProxy,
50 debug_info: Option<AllocatorDebugInfo>,
51) -> Result<(), Error> {
52 let unwrapped_debug_info = match debug_info {
53 Some(x) => x,
54 None => default_allocator_name()?,
55 };
56 Ok(sysmem_client.set_debug_client_info(&AllocatorSetDebugClientInfoRequest {
57 name: Some(unwrapped_debug_info.name),
58 id: Some(unwrapped_debug_info.id),
59 ..Default::default()
60 })?)
61}
62
63impl SysmemAllocatedBuffers {
64 pub fn settings(&self) -> &BufferMemorySettings {
67 &self.settings
68 }
69
70 pub fn get_mut(&mut self, idx: u32) -> Option<&mut zx::Vmo> {
72 let idx = idx as usize;
73 return self.buffers.get_mut(idx);
74 }
75
76 pub fn len(&self) -> u32 {
79 self.buffers.len().try_into().expect("buffers should fit in u32")
80 }
81}
82
83pub enum SysmemAllocation {
86 Pending,
87 WaitingForSync {
89 future: Box<QueryResponseFut<()>>,
90 token_fn: Option<Box<dyn FnOnce() -> () + Send + Sync>>,
91 buffer_collection: BufferCollectionProxy,
92 },
93 WaitingForAllocation(
95 Box<QueryResponseFut<BufferCollectionWaitForAllBuffersAllocatedResult>>,
96 BufferCollectionProxy,
97 ),
98 Done(Result<(), fidl_fuchsia_sysmem2::Error>),
101}
102
103impl SysmemAllocation {
104 pub fn pending() -> Self {
106 Self::Pending
107 }
108
109 pub fn allocate<
116 F: FnOnce(ClientEnd<BufferCollectionTokenMarker>) -> () + 'static + Send + Sync,
117 >(
118 allocator: AllocatorProxy,
119 name: BufferName<'_>,
120 debug_info: Option<AllocatorDebugInfo>,
121 constraints: BufferCollectionConstraints,
122 token_target_fn: F,
123 ) -> Result<Self, Error> {
124 set_allocator_name(&allocator, debug_info).context("Setting alloocator name")?;
126 let (client_token, client_token_request) =
127 fidl::endpoints::create_proxy::<BufferCollectionTokenMarker>();
128 allocator
129 .allocate_shared_collection(AllocatorAllocateSharedCollectionRequest {
130 token_request: Some(client_token_request),
131 ..Default::default()
132 })
133 .context("Allocating shared collection")?;
134
135 let (token, token_request) = fidl::endpoints::create_endpoints();
137 client_token.duplicate(BufferCollectionTokenDuplicateRequest {
138 rights_attenuation_mask: Some(fidl::Rights::SAME_RIGHTS),
139 token_request: Some(token_request),
140 ..Default::default()
141 })?;
142
143 client_token
144 .set_name(&NodeSetNameRequest {
145 priority: Some(name.priority),
146 name: Some(name.name.to_string()),
147 ..Default::default()
148 })
149 .context("set_name on BufferCollectionToken")?;
150
151 let client_end_token = client_token.into_client_end().unwrap();
152
153 let mut res = Self::bind(allocator, client_end_token, constraints)?;
154
155 if let Self::WaitingForSync { token_fn, .. } = &mut res {
156 *token_fn = Some(Box::new(move || token_target_fn(token)));
157 }
158
159 Ok(res)
160 }
161
162 pub fn bind(
166 allocator: AllocatorProxy,
167 token: ClientEnd<BufferCollectionTokenMarker>,
168 constraints: BufferCollectionConstraints,
169 ) -> Result<Self, Error> {
170 let (buffer_collection, collection_request) =
171 fidl::endpoints::create_proxy::<BufferCollectionMarker>();
172 allocator.bind_shared_collection(AllocatorBindSharedCollectionRequest {
173 token: Some(token),
174 buffer_collection_request: Some(collection_request),
175 ..Default::default()
176 })?;
177
178 buffer_collection
179 .set_constraints(BufferCollectionSetConstraintsRequest {
180 constraints: Some(constraints),
181 ..Default::default()
182 })
183 .context("sending constraints to sysmem")?;
184
185 Ok(Self::WaitingForSync {
186 future: Box::new(buffer_collection.sync()),
187 token_fn: None,
188 buffer_collection,
189 })
190 }
191
192 fn synced(&mut self) -> Result<(), Error> {
196 *self = match std::mem::replace(self, Self::Done(Err(fidl_fuchsia_sysmem2::Error::Invalid)))
197 {
198 Self::WaitingForSync { future: _, token_fn, buffer_collection } => {
199 if let Some(deliver_token_fn) = token_fn {
200 deliver_token_fn();
201 }
202 Self::WaitingForAllocation(
203 Box::new(buffer_collection.wait_for_all_buffers_allocated()),
204 buffer_collection,
205 )
206 }
207 _ => Self::Done(Err(fidl_fuchsia_sysmem2::Error::Invalid)),
208 };
209 if let Self::Done(_) = self {
210 return Err(format_err!("bad state in synced"));
211 }
212 Ok(())
213 }
214
215 fn allocated(
218 &mut self,
219 response_result: Result<
220 BufferCollectionWaitForAllBuffersAllocatedResponse,
221 fidl_fuchsia_sysmem2::Error,
222 >,
223 ) -> Result<SysmemAllocatedBuffers, Error> {
224 let done_result = response_result.as_ref().map(|_| ()).map_err(|err| *err);
225 match std::mem::replace(self, Self::Done(done_result)) {
226 Self::WaitingForAllocation(_, buffer_collection) => {
227 let response =
228 response_result.map_err(|err| format_err!("allocation failed: {:?}", err))?;
229 let buffer_info = response.buffer_collection_info.unwrap();
230 let buffers = buffer_info
231 .buffers
232 .unwrap()
233 .iter_mut()
234 .map(|buffer| buffer.vmo.take().expect("missing buffer"))
235 .collect();
236 let settings = buffer_info.settings.unwrap().buffer_settings.unwrap();
237 Ok(SysmemAllocatedBuffers {
238 buffers,
239 settings,
240 _buffer_collection: buffer_collection,
241 })
242 }
243 _ => Err(format_err!("allocation complete but not in the right state")),
244 }
245 }
246}
247
248impl FusedFuture for SysmemAllocation {
249 fn is_terminated(&self) -> bool {
250 match self {
251 Self::Done(_) => true,
252 _ => false,
253 }
254 }
255}
256
257impl Future for SysmemAllocation {
258 type Output = Result<SysmemAllocatedBuffers, Error>;
259
260 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
261 let s = Pin::into_inner(self);
262 if let Self::WaitingForSync { future, .. } = s {
263 match ready!(future.poll_unpin(cx)) {
264 Err(e) => {
265 error!("SysmemAllocator error: {:?}", e);
266 return Poll::Ready(Err(e.into()));
267 }
268 Ok(()) => {
269 if let Err(e) = s.synced() {
270 return Poll::Ready(Err(e));
271 }
272 }
273 };
274 }
275 if let Self::WaitingForAllocation(future, _) = s {
276 match ready!(future.poll_unpin(cx)) {
277 Ok(response_result) => return Poll::Ready(s.allocated(response_result)),
278 Err(e) => {
279 error!("SysmemAllocator waiting error: {:?}", e);
280 Poll::Ready(Err(e.into()))
281 }
282 }
283 } else {
284 Poll::Pending
285 }
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292
293 use fidl_fuchsia_sysmem2::{
294 AllocatorMarker, AllocatorRequest, BufferCollectionInfo, BufferCollectionRequest,
295 BufferCollectionTokenProxy, BufferCollectionTokenRequest,
296 BufferCollectionTokenRequestStream, BufferMemoryConstraints, BufferUsage, CPU_USAGE_READ,
297 CoherencyDomain, Heap, SingleBufferSettings, VIDEO_USAGE_HW_DECODER, VmoBuffer,
298 };
299 use fuchsia_async as fasync;
300 use futures::StreamExt;
301 use std::pin::pin;
302
303 use crate::buffer_collection_constraints::buffer_collection_constraints_default;
304
305 fn assert_tokens_connected(
306 exec: &mut fasync::TestExecutor,
307 proxy: &BufferCollectionTokenProxy,
308 requests: &mut BufferCollectionTokenRequestStream,
309 ) {
310 let mut sync_fut = proxy.sync();
311
312 match exec.run_until_stalled(&mut requests.next()) {
313 Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Sync { responder }))) => {
314 responder.send().expect("respond to sync")
315 }
316 x => panic!("Expected vended token to be connected, got {:?}", x),
317 };
318
319 assert!(exec.run_until_stalled(&mut sync_fut).is_ready());
321 }
322
323 #[fuchsia::test]
324 fn allocate_future() {
325 let mut exec = fasync::TestExecutor::new();
326
327 let (proxy, mut allocator_requests) =
328 fidl::endpoints::create_proxy_and_stream::<AllocatorMarker>();
329
330 let (sender, mut receiver) = futures::channel::oneshot::channel();
331
332 let token_fn = move |token| {
333 sender.send(token).expect("should be able to send token");
334 };
335
336 let mut allocation = SysmemAllocation::allocate(
337 proxy,
338 BufferName { name: "audio-codec.allocate_future", priority: 100 },
339 None,
340 BufferCollectionConstraints {
341 usage: Some(BufferUsage {
342 cpu: Some(CPU_USAGE_READ),
343 video: Some(VIDEO_USAGE_HW_DECODER),
344 ..Default::default()
345 }),
346 min_buffer_count_for_camping: Some(1),
347 ..Default::default()
348 },
349 token_fn,
350 )
351 .expect("starting should work");
352 match exec.run_until_stalled(&mut allocator_requests.next()) {
353 Poll::Ready(Some(Ok(AllocatorRequest::SetDebugClientInfo { .. }))) => (),
354 x => panic!("Expected debug client info, got {:?}", x),
355 };
356
357 let mut token_requests_1 = match exec.run_until_stalled(&mut allocator_requests.next()) {
358 Poll::Ready(Some(Ok(AllocatorRequest::AllocateSharedCollection {
359 payload, ..
360 }))) => payload.token_request.unwrap().into_stream(),
361 x => panic!("Expected a shared allocation request, got {:?}", x),
362 };
363
364 let mut token_requests_2 = match exec.run_until_stalled(&mut token_requests_1.next()) {
365 Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Duplicate { payload, .. }))) => {
366 payload.token_request.unwrap().into_stream()
367 }
368 x => panic!("Expected a duplication request, got {:?}", x),
369 };
370
371 let (token_client_1, mut collection_requests_1) = match exec
372 .run_until_stalled(&mut allocator_requests.next())
373 {
374 Poll::Ready(Some(Ok(AllocatorRequest::BindSharedCollection { payload, .. }))) => (
375 payload.token.unwrap().into_proxy(),
376 payload.buffer_collection_request.unwrap().into_stream(),
377 ),
378 x => panic!("Expected Bind Shared Collection, got: {:?}", x),
379 };
380
381 match exec.run_until_stalled(&mut token_requests_1.next()) {
382 Poll::Ready(Some(Ok(BufferCollectionTokenRequest::SetName { .. }))) => {}
383 x => panic!("Expected setname {:?}", x),
384 };
385
386 assert_tokens_connected(&mut exec, &token_client_1, &mut token_requests_1);
388
389 match exec.run_until_stalled(&mut collection_requests_1.next()) {
390 Poll::Ready(Some(Ok(BufferCollectionRequest::SetConstraints { .. }))) => {}
391 x => panic!("Expected buffer constraints request, got {:?}", x),
392 };
393
394 let sync_responder = match exec.run_until_stalled(&mut collection_requests_1.next()) {
395 Poll::Ready(Some(Ok(BufferCollectionRequest::Sync { responder }))) => responder,
396 x => panic!("Expected a sync request, got {:?}", x),
397 };
398
399 assert!(exec.run_until_stalled(&mut allocation).is_pending());
402
403 sync_responder.send().expect("respond to sync request");
405
406 assert!(exec.run_until_stalled(&mut allocation).is_pending());
407
408 let token_client_2 = match receiver.try_recv() {
409 Ok(Some(token)) => token.into_proxy(),
410 x => panic!("Should have a token sent to the fn, got {:?}", x),
411 };
412
413 assert_tokens_connected(&mut exec, &token_client_2, &mut token_requests_2);
416
417 const SIZE_BYTES: u64 = 1024;
419 let buffer_settings = BufferMemorySettings {
420 size_bytes: Some(SIZE_BYTES),
421 is_physically_contiguous: Some(true),
422 is_secure: Some(false),
423 coherency_domain: Some(CoherencyDomain::Ram),
424 heap: Some(Heap {
425 heap_type: Some(bind_fuchsia_sysmem_heap::HEAP_TYPE_SYSTEM_RAM.into()),
426 ..Default::default()
427 }),
428 ..Default::default()
429 };
430
431 match exec.run_until_stalled(&mut collection_requests_1.next()) {
432 Poll::Ready(Some(Ok(BufferCollectionRequest::WaitForAllBuffersAllocated {
433 responder,
434 }))) => {
435 let single_buffer_settings = SingleBufferSettings {
436 buffer_settings: Some(buffer_settings.clone()),
437 ..Default::default()
438 };
439 let buffer_collection_info = BufferCollectionInfo {
440 settings: Some(single_buffer_settings),
441 buffers: Some(vec![VmoBuffer {
442 vmo: Some(zx::Vmo::create(SIZE_BYTES.into()).expect("vmo creation")),
443 vmo_usable_start: Some(0),
444 ..Default::default()
445 }]),
446 ..Default::default()
447 };
448 let response = BufferCollectionWaitForAllBuffersAllocatedResponse {
449 buffer_collection_info: Some(buffer_collection_info),
450 ..Default::default()
451 };
452 responder.send(Ok(response)).expect("send collection response")
453 }
454 x => panic!("Expected WaitForBuffersAllocated, got {:?}", x),
455 };
456
457 let mut buffers = match exec.run_until_stalled(&mut allocation) {
459 Poll::Pending => panic!("allocation should be done"),
460 Poll::Ready(res) => res.expect("successful allocation"),
461 };
462
463 assert_eq!(1, buffers.len());
464 assert!(buffers.get_mut(0).is_some());
465 assert_eq!(buffers.settings(), &buffer_settings);
466 }
467
468 #[fuchsia::test]
469 fn with_system_allocator() {
470 let mut exec = fasync::TestExecutor::new();
471 let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
472 .expect("connect to allocator");
473
474 let buffer_constraints = BufferCollectionConstraints {
475 min_buffer_count: Some(2),
476 buffer_memory_constraints: Some(BufferMemoryConstraints {
477 min_size_bytes: Some(4096),
478 ..Default::default()
479 }),
480 ..buffer_collection_constraints_default()
481 };
482
483 let (sender, mut receiver) = futures::channel::oneshot::channel();
484 let token_fn = move |token| {
485 sender.send(token).expect("should be able to send token");
486 };
487
488 let mut allocation = SysmemAllocation::allocate(
489 sysmem_client.clone(),
490 BufferName { name: "audio-codec.allocate_future", priority: 100 },
491 None,
492 buffer_constraints.clone(),
493 token_fn,
494 )
495 .expect("start allocator");
496
497 let token = loop {
500 assert!(exec.run_until_stalled(&mut allocation).is_pending());
501 if let Poll::Ready(x) = exec.run_until_stalled(&mut receiver) {
502 break x;
503 }
504 };
505 let token = token.expect("receive token");
506
507 let (buffer_collection_client, buffer_collection_requests) =
508 fidl::endpoints::create_proxy::<BufferCollectionMarker>();
509 sysmem_client
510 .bind_shared_collection(AllocatorBindSharedCollectionRequest {
511 token: Some(token),
512 buffer_collection_request: Some(buffer_collection_requests),
513 ..Default::default()
514 })
515 .expect("bind okay");
516
517 buffer_collection_client
518 .set_constraints(BufferCollectionSetConstraintsRequest {
519 constraints: Some(buffer_constraints),
520 ..Default::default()
521 })
522 .expect("constraints should send okay");
523
524 let mut allocation_fut = pin!(buffer_collection_client.wait_for_all_buffers_allocated());
525
526 let allocation_result =
527 exec.run_singlethreaded(&mut allocation_fut).expect("allocation success");
528
529 assert!(allocation_result.is_ok());
530
531 let allocated_buffers = match exec.run_until_stalled(&mut allocation) {
533 Poll::Ready(bufs) => bufs.expect("allocation success"),
534 x => panic!("Expected ready, got {:?}", x),
535 };
536
537 let _allocator_settings = allocated_buffers.settings();
538
539 let buffers = allocation_result.unwrap().buffer_collection_info.unwrap().buffers.unwrap();
540
541 assert_eq!(buffers.len(), allocated_buffers.len() as usize);
542 }
543}