use anyhow::{format_err, Context as _, Error};
use fidl::client::QueryResponseFut;
use fidl::endpoints::{ClientEnd, Proxy};
use fidl_fuchsia_sysmem2::{
AllocatorAllocateSharedCollectionRequest, AllocatorBindSharedCollectionRequest, AllocatorProxy,
AllocatorSetDebugClientInfoRequest, BufferCollectionConstraints, BufferCollectionMarker,
BufferCollectionProxy, BufferCollectionSetConstraintsRequest,
BufferCollectionTokenDuplicateRequest, BufferCollectionTokenMarker,
BufferCollectionWaitForAllBuffersAllocatedResponse,
BufferCollectionWaitForAllBuffersAllocatedResult, BufferMemorySettings, NodeSetNameRequest,
};
use fuchsia_zircon::{self as zx, AsHandleRef};
use futures::future::{FusedFuture, Future};
use futures::task::{Context, Poll};
use futures::{ready, FutureExt};
use std::pin::Pin;
use tracing::error;
#[derive(Debug)]
pub struct SysmemAllocatedBuffers {
buffers: Vec<zx::Vmo>,
settings: BufferMemorySettings,
_buffer_collection: BufferCollectionProxy,
}
#[derive(Debug)]
pub struct BufferName<'a> {
pub name: &'a str,
pub priority: u32,
}
#[derive(Debug)]
pub struct AllocatorDebugInfo {
pub name: String,
pub id: u64,
}
fn default_allocator_name() -> Result<AllocatorDebugInfo, Error> {
let name = fuchsia_runtime::process_self().get_name()?;
let koid = fuchsia_runtime::process_self().get_koid()?;
Ok(AllocatorDebugInfo { name: name.to_string(), id: koid.raw_koid() })
}
fn set_allocator_name(
sysmem_client: &AllocatorProxy,
debug_info: Option<AllocatorDebugInfo>,
) -> Result<(), Error> {
let unwrapped_debug_info = match debug_info {
Some(x) => x,
None => default_allocator_name()?,
};
Ok(sysmem_client.set_debug_client_info(&AllocatorSetDebugClientInfoRequest {
name: Some(unwrapped_debug_info.name),
id: Some(unwrapped_debug_info.id),
..Default::default()
})?)
}
impl SysmemAllocatedBuffers {
pub fn settings(&self) -> &BufferMemorySettings {
&self.settings
}
pub fn get_mut(&mut self, idx: u32) -> Option<&mut zx::Vmo> {
let idx = idx as usize;
return self.buffers.get_mut(idx);
}
pub fn len(&self) -> u32 {
self.buffers.len().try_into().expect("buffers should fit in u32")
}
}
pub enum SysmemAllocation {
Pending,
WaitingForSync {
future: QueryResponseFut<()>,
token_fn: Option<Box<dyn FnOnce() -> () + Send + Sync>>,
buffer_collection: BufferCollectionProxy,
},
WaitingForAllocation(
QueryResponseFut<BufferCollectionWaitForAllBuffersAllocatedResult>,
BufferCollectionProxy,
),
Done(Result<(), fidl_fuchsia_sysmem2::Error>),
}
impl SysmemAllocation {
pub fn pending() -> Self {
Self::Pending
}
pub fn allocate<
F: FnOnce(ClientEnd<BufferCollectionTokenMarker>) -> () + 'static + Send + Sync,
>(
allocator: AllocatorProxy,
name: BufferName<'_>,
debug_info: Option<AllocatorDebugInfo>,
constraints: BufferCollectionConstraints,
token_target_fn: F,
) -> Result<Self, Error> {
set_allocator_name(&allocator, debug_info).context("Setting alloocator name")?;
let (client_token, client_token_request) =
fidl::endpoints::create_proxy::<BufferCollectionTokenMarker>()?;
allocator
.allocate_shared_collection(AllocatorAllocateSharedCollectionRequest {
token_request: Some(client_token_request),
..Default::default()
})
.context("Allocating shared collection")?;
let (token, token_request) = fidl::endpoints::create_endpoints();
client_token.duplicate(BufferCollectionTokenDuplicateRequest {
rights_attenuation_mask: Some(fidl::Rights::SAME_RIGHTS),
token_request: Some(token_request),
..Default::default()
})?;
client_token
.set_name(&NodeSetNameRequest {
priority: Some(name.priority),
name: Some(name.name.to_string()),
..Default::default()
})
.context("set_name on BufferCollectionToken")?;
let client_end_token = client_token.into_client_end().unwrap();
let mut res = Self::bind(allocator, client_end_token, constraints)?;
if let Self::WaitingForSync { token_fn, .. } = &mut res {
*token_fn = Some(Box::new(move || token_target_fn(token)));
}
Ok(res)
}
pub fn bind(
allocator: AllocatorProxy,
token: ClientEnd<BufferCollectionTokenMarker>,
constraints: BufferCollectionConstraints,
) -> Result<Self, Error> {
let (buffer_collection, collection_request) =
fidl::endpoints::create_proxy::<BufferCollectionMarker>()?;
allocator.bind_shared_collection(AllocatorBindSharedCollectionRequest {
token: Some(token),
buffer_collection_request: Some(collection_request),
..Default::default()
})?;
buffer_collection
.set_constraints(BufferCollectionSetConstraintsRequest {
constraints: Some(constraints),
..Default::default()
})
.context("sending constraints to sysmem")?;
Ok(Self::WaitingForSync {
future: buffer_collection.sync(),
token_fn: None,
buffer_collection,
})
}
fn synced(&mut self) -> Result<(), Error> {
*self = match std::mem::replace(self, Self::Done(Err(fidl_fuchsia_sysmem2::Error::Invalid)))
{
Self::WaitingForSync { future: _, token_fn, buffer_collection } => {
if let Some(deliver_token_fn) = token_fn {
deliver_token_fn();
}
Self::WaitingForAllocation(
buffer_collection.wait_for_all_buffers_allocated(),
buffer_collection,
)
}
_ => Self::Done(Err(fidl_fuchsia_sysmem2::Error::Invalid)),
};
if let Self::Done(_) = self {
return Err(format_err!("bad state in synced"));
}
Ok(())
}
fn allocated(
&mut self,
response_result: Result<
BufferCollectionWaitForAllBuffersAllocatedResponse,
fidl_fuchsia_sysmem2::Error,
>,
) -> Result<SysmemAllocatedBuffers, Error> {
let done_result = response_result.as_ref().map(|_| ()).map_err(|err| *err);
match std::mem::replace(self, Self::Done(done_result)) {
Self::WaitingForAllocation(_, buffer_collection) => {
let response =
response_result.map_err(|err| format_err!("allocation failed: {:?}", err))?;
let buffer_info = response.buffer_collection_info.unwrap();
let buffers = buffer_info
.buffers
.unwrap()
.iter_mut()
.map(|buffer| buffer.vmo.take().expect("missing buffer"))
.collect();
let settings = buffer_info.settings.unwrap().buffer_settings.unwrap();
Ok(SysmemAllocatedBuffers {
buffers,
settings,
_buffer_collection: buffer_collection,
})
}
_ => Err(format_err!("allocation complete but not in the right state")),
}
}
}
impl FusedFuture for SysmemAllocation {
fn is_terminated(&self) -> bool {
match self {
Self::Done(_) => true,
_ => false,
}
}
}
impl Future for SysmemAllocation {
type Output = Result<SysmemAllocatedBuffers, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let s = Pin::into_inner(self);
if let Self::WaitingForSync { future, .. } = s {
match ready!(future.poll_unpin(cx)) {
Err(e) => {
error!("SysmemAllocator error: {:?}", e);
return Poll::Ready(Err(e.into()));
}
Ok(()) => {
if let Err(e) = s.synced() {
return Poll::Ready(Err(e));
}
}
};
}
if let Self::WaitingForAllocation(future, _) = s {
match ready!(future.poll_unpin(cx)) {
Ok(response_result) => return Poll::Ready(s.allocated(response_result)),
Err(e) => {
error!("SysmemAllocator waiting error: {:?}", e);
Poll::Ready(Err(e.into()))
}
}
} else {
Poll::Pending
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use fidl_fuchsia_sysmem2::{
AllocatorMarker, AllocatorRequest, BufferCollectionInfo, BufferCollectionRequest,
BufferCollectionTokenProxy, BufferCollectionTokenRequest,
BufferCollectionTokenRequestStream, BufferMemoryConstraints, BufferUsage, CoherencyDomain,
Heap, SingleBufferSettings, VmoBuffer, CPU_USAGE_READ, VIDEO_USAGE_HW_DECODER,
};
use fuchsia_async as fasync;
use futures::StreamExt;
use std::pin::pin;
use crate::buffer_collection_constraints::buffer_collection_constraints_default;
fn assert_tokens_connected(
exec: &mut fasync::TestExecutor,
proxy: &BufferCollectionTokenProxy,
requests: &mut BufferCollectionTokenRequestStream,
) {
let mut sync_fut = proxy.sync();
match exec.run_until_stalled(&mut requests.next()) {
Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Sync { responder }))) => {
responder.send().expect("respond to sync")
}
x => panic!("Expected vended token to be connected, got {:?}", x),
};
assert!(exec.run_until_stalled(&mut sync_fut).is_ready());
}
#[fuchsia::test]
fn allocate_future() {
let mut exec = fasync::TestExecutor::new();
let (proxy, mut allocator_requests) =
fidl::endpoints::create_proxy_and_stream::<AllocatorMarker>().unwrap();
let (sender, mut receiver) = futures::channel::oneshot::channel();
let token_fn = move |token| {
sender.send(token).expect("should be able to send token");
};
let mut allocation = SysmemAllocation::allocate(
proxy,
BufferName { name: "audio-codec.allocate_future", priority: 100 },
None,
BufferCollectionConstraints {
usage: Some(BufferUsage {
cpu: Some(CPU_USAGE_READ),
video: Some(VIDEO_USAGE_HW_DECODER),
..Default::default()
}),
min_buffer_count_for_camping: Some(1),
..Default::default()
},
token_fn,
)
.expect("starting should work");
match exec.run_until_stalled(&mut allocator_requests.next()) {
Poll::Ready(Some(Ok(AllocatorRequest::SetDebugClientInfo { .. }))) => (),
x => panic!("Expected debug client info, got {:?}", x),
};
let mut token_requests_1 = match exec.run_until_stalled(&mut allocator_requests.next()) {
Poll::Ready(Some(Ok(AllocatorRequest::AllocateSharedCollection {
payload, ..
}))) => payload.token_request.unwrap().into_stream().expect("request into stream"),
x => panic!("Expected a shared allocation request, got {:?}", x),
};
let mut token_requests_2 = match exec.run_until_stalled(&mut token_requests_1.next()) {
Poll::Ready(Some(Ok(BufferCollectionTokenRequest::Duplicate { payload, .. }))) => {
payload.token_request.unwrap().into_stream().expect("duplicate request into stream")
}
x => panic!("Expected a duplication request, got {:?}", x),
};
let (token_client_1, mut collection_requests_1) = match exec
.run_until_stalled(&mut allocator_requests.next())
{
Poll::Ready(Some(Ok(AllocatorRequest::BindSharedCollection { payload, .. }))) => (
payload.token.unwrap().into_proxy().unwrap(),
payload
.buffer_collection_request
.unwrap()
.into_stream()
.expect("collection request into stream"),
),
x => panic!("Expected Bind Shared Collection, got: {:?}", x),
};
match exec.run_until_stalled(&mut token_requests_1.next()) {
Poll::Ready(Some(Ok(BufferCollectionTokenRequest::SetName { .. }))) => {}
x => panic!("Expected setname {:?}", x),
};
assert_tokens_connected(&mut exec, &token_client_1, &mut token_requests_1);
match exec.run_until_stalled(&mut collection_requests_1.next()) {
Poll::Ready(Some(Ok(BufferCollectionRequest::SetConstraints { .. }))) => {}
x => panic!("Expected buffer constraints request, got {:?}", x),
};
let sync_responder = match exec.run_until_stalled(&mut collection_requests_1.next()) {
Poll::Ready(Some(Ok(BufferCollectionRequest::Sync { responder }))) => responder,
x => panic!("Expected a sync request, got {:?}", x),
};
assert!(exec.run_until_stalled(&mut allocation).is_pending());
sync_responder.send().expect("respond to sync request");
assert!(exec.run_until_stalled(&mut allocation).is_pending());
let token_client_2 = match receiver.try_recv() {
Ok(Some(token)) => token.into_proxy().unwrap(),
x => panic!("Should have a token sent to the fn, got {:?}", x),
};
assert_tokens_connected(&mut exec, &token_client_2, &mut token_requests_2);
const SIZE_BYTES: u64 = 1024;
let buffer_settings = BufferMemorySettings {
size_bytes: Some(SIZE_BYTES),
is_physically_contiguous: Some(true),
is_secure: Some(false),
coherency_domain: Some(CoherencyDomain::Ram),
heap: Some(Heap {
heap_type: Some(bind_fuchsia_sysmem_heap::HEAP_TYPE_SYSTEM_RAM.into()),
..Default::default()
}),
..Default::default()
};
match exec.run_until_stalled(&mut collection_requests_1.next()) {
Poll::Ready(Some(Ok(BufferCollectionRequest::WaitForAllBuffersAllocated {
responder,
}))) => {
let single_buffer_settings = SingleBufferSettings {
buffer_settings: Some(buffer_settings.clone()),
..Default::default()
};
let buffer_collection_info = BufferCollectionInfo {
settings: Some(single_buffer_settings),
buffers: Some(vec![VmoBuffer {
vmo: Some(zx::Vmo::create(SIZE_BYTES.into()).expect("vmo creation")),
vmo_usable_start: Some(0),
..Default::default()
}]),
..Default::default()
};
let response = BufferCollectionWaitForAllBuffersAllocatedResponse {
buffer_collection_info: Some(buffer_collection_info),
..Default::default()
};
responder.send(Ok(response)).expect("send collection response")
}
x => panic!("Expected WaitForBuffersAllocated, got {:?}", x),
};
let mut buffers = match exec.run_until_stalled(&mut allocation) {
Poll::Pending => panic!("allocation should be done"),
Poll::Ready(res) => res.expect("successful allocation"),
};
assert_eq!(1, buffers.len());
assert!(buffers.get_mut(0).is_some());
assert_eq!(buffers.settings(), &buffer_settings);
}
#[fuchsia::test]
fn with_system_allocator() {
let mut exec = fasync::TestExecutor::new();
let sysmem_client = fuchsia_component::client::connect_to_protocol::<AllocatorMarker>()
.expect("connect to allocator");
let buffer_constraints = BufferCollectionConstraints {
min_buffer_count: Some(2),
buffer_memory_constraints: Some(BufferMemoryConstraints {
min_size_bytes: Some(4096),
..Default::default()
}),
..buffer_collection_constraints_default()
};
let (sender, mut receiver) = futures::channel::oneshot::channel();
let token_fn = move |token| {
sender.send(token).expect("should be able to send token");
};
let mut allocation = SysmemAllocation::allocate(
sysmem_client.clone(),
BufferName { name: "audio-codec.allocate_future", priority: 100 },
None,
buffer_constraints.clone(),
token_fn,
)
.expect("start allocator");
let token = loop {
assert!(exec.run_until_stalled(&mut allocation).is_pending());
if let Poll::Ready(x) = exec.run_until_stalled(&mut receiver) {
break x;
}
};
let token = token.expect("receive token");
let (buffer_collection_client, buffer_collection_requests) =
fidl::endpoints::create_proxy::<BufferCollectionMarker>().expect("proxy creation");
sysmem_client
.bind_shared_collection(AllocatorBindSharedCollectionRequest {
token: Some(token),
buffer_collection_request: Some(buffer_collection_requests),
..Default::default()
})
.expect("bind okay");
buffer_collection_client
.set_constraints(BufferCollectionSetConstraintsRequest {
constraints: Some(buffer_constraints),
..Default::default()
})
.expect("constraints should send okay");
let mut allocation_fut = pin!(buffer_collection_client.wait_for_all_buffers_allocated());
let allocation_result =
exec.run_singlethreaded(&mut allocation_fut).expect("allocation success");
assert!(allocation_result.is_ok());
let allocated_buffers = match exec.run_until_stalled(&mut allocation) {
Poll::Ready(bufs) => bufs.expect("allocation success"),
x => panic!("Expected ready, got {:?}", x),
};
let _allocator_settings = allocated_buffers.settings();
let buffers = allocation_result.unwrap().buffer_collection_info.unwrap().buffers.unwrap();
assert_eq!(buffers.len(), allocated_buffers.len() as usize);
}
}