use display_types::IMAGE_TILING_TYPE_LINEAR;
use fidl::endpoints::ClientEnd;
use fidl_fuchsia_hardware_display::{
self as display, CoordinatorListenerRequest, LayerId as FidlLayerId,
};
use fidl_fuchsia_hardware_display_types::{self as display_types};
use fidl_fuchsia_io as fio;
use fuchsia_async::{DurationExt as _, TimeoutExt as _};
use fuchsia_component::client::connect_to_protocol_at_path;
use fuchsia_fs::directory::{WatchEvent, Watcher};
use fuchsia_sync::RwLock;
use fuchsia_zircon::{self as zx, HandleBased};
use futures::channel::mpsc;
use futures::{future, TryStreamExt};
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::config::{DisplayConfig, LayerConfig};
use crate::error::{ConfigError, Error, Result};
use crate::types::{
BufferCollectionId, BufferId, DisplayId, DisplayInfo, Event, EventId, ImageId, LayerId,
};
use crate::INVALID_EVENT_ID;
const DEV_DIR_PATH: &str = "/dev/class/display-coordinator";
const TIMEOUT: zx::Duration = zx::Duration::from_seconds(2);
#[derive(Clone)]
pub struct Coordinator {
inner: Arc<RwLock<CoordinatorInner>>,
}
struct CoordinatorInner {
displays: Vec<DisplayInfo>,
proxy: display::CoordinatorProxy,
listener_requests: Option<display::CoordinatorListenerRequestStream>,
vsync_listeners: Vec<(mpsc::UnboundedSender<VsyncEvent>, Option<DisplayId>)>,
id_counter: u64,
}
#[derive(Debug)]
pub struct VsyncEvent {
pub id: DisplayId,
pub timestamp: zx::MonotonicTime,
pub config: display_types::ConfigStamp,
}
impl Coordinator {
pub async fn init() -> Result<Coordinator> {
let path = watch_first_file(DEV_DIR_PATH)
.on_timeout(TIMEOUT.after_now(), || Err(Error::DeviceNotFound))
.await?;
let path = path.to_str().ok_or(Error::DevicePathInvalid)?;
let provider_proxy = connect_to_protocol_at_path::<display::ProviderMarker>(path)
.map_err(Error::DeviceConnectionError)?;
let (coordinator_proxy, coordinator_server_end) =
fidl::endpoints::create_proxy::<display::CoordinatorMarker>()?;
let (coordinator_listener_client_end, coordinator_listener_requests) =
fidl::endpoints::create_request_stream::<display::CoordinatorListenerMarker>()?;
let payload = display::ProviderOpenCoordinatorWithListenerForPrimaryRequest {
coordinator: Some(coordinator_server_end),
coordinator_listener: Some(coordinator_listener_client_end),
__source_breaking: fidl::marker::SourceBreaking,
};
let () = provider_proxy
.open_coordinator_with_listener_for_primary(payload)
.await?
.map_err(zx::Status::from_raw)?;
Self::init_with_proxy_and_listener_requests(
coordinator_proxy,
coordinator_listener_requests,
)
.await
}
pub async fn init_with_proxy_and_listener_requests(
coordinator_proxy: display::CoordinatorProxy,
mut listener_requests: display::CoordinatorListenerRequestStream,
) -> Result<Coordinator> {
let displays = wait_for_initial_displays(&mut listener_requests)
.on_timeout(TIMEOUT.after_now(), || Err(Error::NoDisplays))
.await?
.into_iter()
.map(DisplayInfo)
.collect::<Vec<_>>();
Ok(Coordinator {
inner: Arc::new(RwLock::new(CoordinatorInner {
proxy: coordinator_proxy,
listener_requests: Some(listener_requests),
displays,
vsync_listeners: Vec::new(),
id_counter: 0,
})),
})
}
pub fn displays(&self) -> Vec<DisplayInfo> {
self.inner.read().displays.clone()
}
pub fn proxy(&self) -> display::CoordinatorProxy {
self.inner.read().proxy.clone()
}
pub fn add_vsync_listener(
&self,
id: Option<DisplayId>,
) -> Result<mpsc::UnboundedReceiver<VsyncEvent>> {
self.inner.read().proxy.enable_vsync(true)?;
let (sender, receiver) = mpsc::unbounded::<VsyncEvent>();
self.inner.write().vsync_listeners.push((sender, id));
Ok(receiver)
}
pub async fn handle_events(&self) -> Result<()> {
let inner = self.inner.clone();
let mut events = inner.write().listener_requests.take().ok_or(Error::AlreadyRequested)?;
while let Some(msg) = events.try_next().await? {
match msg {
CoordinatorListenerRequest::OnDisplaysChanged {
added,
removed,
control_handle: _,
} => {
let removed =
removed.into_iter().map(|id| id.into()).collect::<Vec<DisplayId>>();
inner.read().handle_displays_changed(added, removed);
}
CoordinatorListenerRequest::OnVsync {
display_id,
timestamp,
applied_config_stamp,
cookie,
control_handle: _,
} => {
inner.write().handle_vsync(
display_id.into(),
zx::MonotonicTime::from_nanos(timestamp),
applied_config_stamp,
cookie,
)?;
}
_ => continue,
}
}
Ok(())
}
pub async fn create_layer(&self) -> Result<LayerId> {
Ok(self.proxy().create_layer().await?.map_err(zx::Status::from_raw)?.into())
}
pub fn create_event(&self) -> Result<Event> {
let event = zx::Event::create();
let remote = event.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
let id = self.inner.write().next_free_event_id()?;
self.inner.read().proxy.import_event(zx::Event::from(remote), &id.into())?;
Ok(Event::new(id, event))
}
pub async fn apply_config(
&self,
configs: &[DisplayConfig],
) -> std::result::Result<(), ConfigError> {
let proxy = self.proxy();
for config in configs {
proxy.set_display_layers(
&config.id.into(),
&config.layers.iter().map(|l| l.id.into()).collect::<Vec<FidlLayerId>>(),
)?;
for layer in &config.layers {
match &layer.config {
LayerConfig::Color { pixel_format, color_bytes } => {
proxy.set_layer_color_config(
&layer.id.into(),
pixel_format.into(),
&color_bytes,
)?;
}
LayerConfig::Primary {
image_id,
image_metadata,
unblock_event,
retirement_event,
} => {
proxy.set_layer_primary_config(&layer.id.into(), &image_metadata)?;
proxy.set_layer_image(
&layer.id.into(),
&(*image_id).into(),
&unblock_event.unwrap_or(INVALID_EVENT_ID).into(),
&retirement_event.unwrap_or(INVALID_EVENT_ID).into(),
)?;
}
}
}
}
let (result, ops) = proxy.check_config(false).await?;
if result != display_types::ConfigResult::Ok {
return Err(ConfigError::invalid(result, ops));
}
proxy.apply_config().map_err(ConfigError::from)
}
pub async fn get_recent_applied_config_stamp(&self) -> std::result::Result<u64, Error> {
let proxy = self.proxy();
let response = proxy.get_latest_applied_config_stamp().await?;
Ok(response.value)
}
pub(crate) async fn import_buffer_collection(
&self,
token: ClientEnd<fidl_fuchsia_sysmem2::BufferCollectionTokenMarker>,
) -> Result<BufferCollectionId> {
let id = self.inner.write().next_free_collection_id()?;
let proxy = self.proxy();
proxy.import_buffer_collection(&id.into(), token).await?.map_err(zx::Status::from_raw)?;
proxy
.set_buffer_collection_constraints(
&id.into(),
&display_types::ImageBufferUsage { tiling_type: IMAGE_TILING_TYPE_LINEAR },
)
.await?
.map_err(zx::Status::from_raw)?;
Ok(id)
}
pub(crate) fn release_buffer_collection(&self, id: BufferCollectionId) -> Result<()> {
self.inner.read().proxy.release_buffer_collection(&id.into()).map_err(Error::from)
}
pub(crate) async fn import_image(
&self,
collection_id: BufferCollectionId,
image_id: ImageId,
image_metadata: display_types::ImageMetadata,
) -> Result<()> {
self.proxy()
.import_image(
&image_metadata,
&BufferId::new(collection_id, 0).into(),
&image_id.into(),
)
.await?
.map_err(zx::Status::from_raw)?;
Ok(())
}
}
impl fmt::Debug for Coordinator {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Coordinator").field("displays", &self.displays()).finish()
}
}
impl CoordinatorInner {
fn next_free_collection_id(&mut self) -> Result<BufferCollectionId> {
self.id_counter = self.id_counter.checked_add(1).ok_or(Error::IdsExhausted)?;
Ok(BufferCollectionId(self.id_counter))
}
fn next_free_event_id(&mut self) -> Result<EventId> {
self.id_counter = self.id_counter.checked_add(1).ok_or(Error::IdsExhausted)?;
Ok(EventId(self.id_counter))
}
fn handle_displays_changed(&self, _added: Vec<display::Info>, _removed: Vec<DisplayId>) {
}
fn handle_vsync(
&mut self,
display_id: DisplayId,
timestamp: zx::MonotonicTime,
applied_config_stamp: display_types::ConfigStamp,
cookie: display::VsyncAckCookie,
) -> Result<()> {
self.proxy.acknowledge_vsync(cookie.value)?;
let mut listeners_to_remove = Vec::new();
for (pos, (sender, filter)) in self.vsync_listeners.iter().enumerate() {
if filter.as_ref().map_or(false, |id| *id != display_id) {
continue;
}
let payload = VsyncEvent { id: display_id, timestamp, config: applied_config_stamp };
if let Err(e) = sender.unbounded_send(payload) {
if e.is_disconnected() {
listeners_to_remove.push(pos);
} else {
return Err(e.into());
}
}
}
listeners_to_remove.into_iter().for_each(|pos| {
self.vsync_listeners.swap_remove(pos);
});
Ok(())
}
}
async fn watch_first_file(path: &str) -> Result<PathBuf> {
let dir =
fuchsia_fs::directory::open_in_namespace_deprecated(path, fio::OpenFlags::RIGHT_READABLE)?;
let mut watcher = Watcher::new(&dir).await?;
while let Some(msg) = watcher.try_next().await? {
match msg.event {
WatchEvent::EXISTING | WatchEvent::ADD_FILE => {
if msg.filename == Path::new(".") {
continue;
}
return Ok(Path::new(path).join(msg.filename));
}
_ => continue,
}
}
Err(Error::DeviceNotFound)
}
async fn wait_for_initial_displays(
listener_requests: &mut display::CoordinatorListenerRequestStream,
) -> Result<Vec<display::Info>> {
let mut stream = listener_requests.try_filter_map(|event| match event {
CoordinatorListenerRequest::OnDisplaysChanged { added, removed: _, control_handle: _ } => {
future::ok(Some(added))
}
_ => future::ok(None),
});
stream.try_next().await?.ok_or(Error::NoDisplays)
}
#[cfg(test)]
mod tests {
use super::{Coordinator, DisplayId, VsyncEvent};
use anyhow::{format_err, Context, Result};
use assert_matches::assert_matches;
use display_mocks::{create_proxy_and_mock, MockCoordinator};
use fuchsia_async::TestExecutor;
use futures::task::Poll;
use futures::{pin_mut, select, FutureExt, StreamExt};
use {
fidl_fuchsia_hardware_display as display,
fidl_fuchsia_hardware_display_types as display_types,
};
async fn init_with_proxy_and_listener_requests(
coordinator_proxy: display::CoordinatorProxy,
listener_requests: display::CoordinatorListenerRequestStream,
) -> Result<Coordinator> {
Coordinator::init_with_proxy_and_listener_requests(coordinator_proxy, listener_requests)
.await
.context("failed to initialize Coordinator")
}
async fn init_with_displays(
displays: &[display::Info],
) -> Result<(Coordinator, MockCoordinator)> {
let (coordinator_proxy, listener_requests, mut mock) = create_proxy_and_mock()?;
mock.assign_displays(displays.to_vec())?;
Ok((
init_with_proxy_and_listener_requests(coordinator_proxy, listener_requests).await?,
mock,
))
}
#[fuchsia::test]
async fn test_init_fails_with_no_device_dir() {
let result = Coordinator::init().await;
assert_matches!(result, Err(_));
}
#[fuchsia::test]
async fn test_init_with_no_displays() -> Result<()> {
let (coordinator_proxy, listener_requests, mut mock) = create_proxy_and_mock()?;
mock.assign_displays([].to_vec())?;
let coordinator =
init_with_proxy_and_listener_requests(coordinator_proxy, listener_requests).await?;
assert!(coordinator.displays().is_empty());
Ok(())
}
#[fuchsia::test]
async fn test_init_with_displays() -> Result<()> {
let displays = [
display::Info {
id: display_types::DisplayId { value: 1 },
modes: Vec::new(),
pixel_format: Vec::new(),
manufacturer_name: "Foo".to_string(),
monitor_name: "what".to_string(),
monitor_serial: "".to_string(),
horizontal_size_mm: 0,
vertical_size_mm: 0,
using_fallback_size: false,
},
display::Info {
id: display_types::DisplayId { value: 2 },
modes: Vec::new(),
pixel_format: Vec::new(),
manufacturer_name: "Bar".to_string(),
monitor_name: "who".to_string(),
monitor_serial: "".to_string(),
horizontal_size_mm: 0,
vertical_size_mm: 0,
using_fallback_size: false,
},
]
.to_vec();
let (coordinator_proxy, listener_requests, mut mock) = create_proxy_and_mock()?;
mock.assign_displays(displays.clone())?;
let coordinator =
init_with_proxy_and_listener_requests(coordinator_proxy, listener_requests).await?;
assert_eq!(coordinator.displays().len(), 2);
assert_eq!(coordinator.displays()[0].0, displays[0]);
assert_eq!(coordinator.displays()[1].0, displays[1]);
Ok(())
}
#[test]
fn test_vsync_listener_single() -> Result<()> {
let mut executor = TestExecutor::new();
let (coordinator, mock) = executor.run_singlethreaded(init_with_displays(&[]))?;
let mut vsync = coordinator.add_vsync_listener(None)?;
const ID: DisplayId = DisplayId(1);
const STAMP: display_types::ConfigStamp = display_types::ConfigStamp { value: 1 };
let event_handlers = async {
select! {
event = vsync.next() => event.ok_or(format_err!("did not receive vsync event")),
result = coordinator.handle_events().fuse() => {
result.context("FIDL event handler failed")?;
Err(format_err!("FIDL event handler completed before client vsync event"))
},
}
};
pin_mut!(event_handlers);
mock.emit_vsync_event(ID.0, STAMP)?;
let vsync_event = executor.run_until_stalled(&mut event_handlers);
assert_matches!(
vsync_event,
Poll::Ready(Ok(VsyncEvent { id: ID, timestamp: _, config: STAMP }))
);
Ok(())
}
#[test]
fn test_vsync_listener_multiple() -> Result<()> {
let mut executor = TestExecutor::new();
let (coordinator, mock) = executor.run_singlethreaded(init_with_displays(&[]))?;
let mut vsync = coordinator.add_vsync_listener(None)?;
let fidl_server = coordinator.handle_events().fuse();
pin_mut!(fidl_server);
const ID1: DisplayId = DisplayId(1);
const ID2: DisplayId = DisplayId(2);
const STAMP: display_types::ConfigStamp = display_types::ConfigStamp { value: 1 };
mock.emit_vsync_event(ID1.0, STAMP)?;
mock.emit_vsync_event(ID2.0, STAMP)?;
mock.emit_vsync_event(ID1.0, STAMP)?;
let fidl_server_result = executor.run_until_stalled(&mut fidl_server);
assert_matches!(fidl_server_result, Poll::Pending);
let vsync_event = executor.run_until_stalled(&mut Box::pin(async { vsync.next().await }));
assert_matches!(
vsync_event,
Poll::Ready(Some(VsyncEvent { id: ID1, timestamp: _, config: STAMP }))
);
let vsync_event = executor.run_until_stalled(&mut Box::pin(async { vsync.next().await }));
assert_matches!(
vsync_event,
Poll::Ready(Some(VsyncEvent { id: ID2, timestamp: _, config: STAMP }))
);
let vsync_event = executor.run_until_stalled(&mut Box::pin(async { vsync.next().await }));
assert_matches!(
vsync_event,
Poll::Ready(Some(VsyncEvent { id: ID1, timestamp: _, config: STAMP }))
);
Ok(())
}
#[test]
fn test_vsync_listener_display_id_filter() -> Result<()> {
let mut executor = TestExecutor::new();
let (coordinator, mock) = executor.run_singlethreaded(init_with_displays(&[]))?;
const ID1: DisplayId = DisplayId(1);
const ID2: DisplayId = DisplayId(2);
const STAMP: display_types::ConfigStamp = display_types::ConfigStamp { value: 1 };
let mut vsync = coordinator.add_vsync_listener(Some(ID2))?;
let event_handlers = async {
select! {
event = vsync.next() => event.ok_or(format_err!("did not receive vsync event")),
result = coordinator.handle_events().fuse() => {
result.context("FIDL event handler failed")?;
Err(format_err!("FIDL event handler completed before client vsync event"))
},
}
};
pin_mut!(event_handlers);
mock.emit_vsync_event(ID1.0, STAMP)?;
let vsync_event = executor.run_until_stalled(&mut event_handlers);
assert_matches!(vsync_event, Poll::Pending);
mock.emit_vsync_event(ID2.0, STAMP)?;
let vsync_event = executor.run_until_stalled(&mut event_handlers);
assert_matches!(
vsync_event,
Poll::Ready(Ok(VsyncEvent { id: ID2, timestamp: _, config: STAMP }))
);
Ok(())
}
}