1use anyhow::{Result, anyhow};
5use fidl::endpoints::{ClientEnd, ServerEnd, create_endpoints, create_proxy};
6use fuchsia_inspect::{self, Property};
7use futures::TryStreamExt;
8use futures::future::LocalBoxFuture;
9use std::sync::Arc;
10use zx::{HandleBased, Rights};
11use {fidl_fuchsia_power_broker as fbroker, fuchsia_async as fasync};
12
13pub const BINARY_POWER_LEVELS: [fbroker::PowerLevel; 2] = [
16 fbroker::BinaryPowerLevel::Off.into_primitive(),
17 fbroker::BinaryPowerLevel::On.into_primitive(),
18];
19
20pub struct PowerElementContext {
21 pub element_control: fbroker::ElementControlProxy,
22 pub lessor: fbroker::LessorProxy,
23 assertive_dependency_token: Option<fbroker::DependencyToken>,
24 name: String,
25 initial_level: fbroker::PowerLevel,
26}
27
28impl PowerElementContext {
29 pub fn builder<'a>(
30 topology: &'a fbroker::TopologyProxy,
31 element_name: &'a str,
32 valid_levels: &'a [fbroker::PowerLevel],
33 element_runner_client: ClientEnd<fbroker::ElementRunnerMarker>,
34 ) -> PowerElementContextBuilder<'a> {
35 PowerElementContextBuilder::new(topology, element_name, valid_levels, element_runner_client)
36 }
37
38 pub fn assertive_dependency_token(&self) -> Option<fbroker::DependencyToken> {
39 self.assertive_dependency_token.as_ref().and_then(|token| {
40 Some(token.duplicate_handle(Rights::SAME_RIGHTS).expect("failed to duplicate token"))
41 })
42 }
43
44 pub fn name(&self) -> &str {
45 &self.name
46 }
47
48 pub async fn run<'a>(
52 &self,
53 element_runner: ServerEnd<fbroker::ElementRunnerMarker>,
54 inspect_node: Option<fuchsia_inspect::Node>,
55 update_fn: Option<Box<dyn Fn(fbroker::PowerLevel) -> LocalBoxFuture<'a, ()> + 'a>>,
56 ) {
57 let mut stream = element_runner.into_stream();
58
59 let mut last_required_level: fbroker::PowerLevel = self.initial_level;
60 let power_level_node = inspect_node
61 .as_ref()
62 .map(|node| node.create_uint("power_level", last_required_level.into()));
63
64 while let Ok(Some(request)) = stream.try_next().await {
65 match request {
66 fbroker::ElementRunnerRequest::SetLevel { level: required_level, responder } => {
67 log::debug!(
68 element_name:? = &self.name,
69 required_level:?,
70 last_required_level:?;
71 "PowerElementContext::run: new level requested"
72 );
73 if required_level != last_required_level {
74 if let Some(update_fn) = &update_fn {
75 update_fn(required_level).await;
76 }
77 if let Some(ref power_level_node) = power_level_node {
78 power_level_node.set(required_level.into());
79 }
80 last_required_level = required_level;
81 } else {
82 log::debug!(
83 element_name:? = &self.name,
84 required_level:?,
85 last_required_level:?;
86 "PowerElementContext::run: required level has not changed, skipping."
87 );
88 }
89 if let Some(err) = responder.send().err() {
90 log::warn!("PowerElementContext::run: SetLevel response failed: {err}");
91 }
92 }
93 fbroker::ElementRunnerRequest::_UnknownMethod { .. } => {}
94 }
95 }
96 }
97}
98
99pub struct PowerElementContextBuilder<'a> {
100 topology: &'a fbroker::TopologyProxy,
101 element_name: &'a str,
102 initial_current_level: fbroker::PowerLevel,
103 element_runner_client: ClientEnd<fbroker::ElementRunnerMarker>,
104 valid_levels: &'a [fbroker::PowerLevel],
105 dependencies: Vec<fbroker::LevelDependency>,
106 register_dependency_tokens: bool,
107}
108
109impl<'a> PowerElementContextBuilder<'a> {
110 pub fn new(
111 topology: &'a fbroker::TopologyProxy,
112 element_name: &'a str,
113 valid_levels: &'a [fbroker::PowerLevel],
114 element_runner_client: ClientEnd<fbroker::ElementRunnerMarker>,
115 ) -> Self {
116 Self {
117 topology,
118 element_name,
119 valid_levels,
120 element_runner_client,
121 initial_current_level: Default::default(),
122 dependencies: Default::default(),
123 register_dependency_tokens: true,
124 }
125 }
126
127 pub fn initial_current_level(mut self, value: fbroker::PowerLevel) -> Self {
128 self.initial_current_level = value;
129 self
130 }
131
132 pub fn dependencies(mut self, value: Vec<fbroker::LevelDependency>) -> Self {
133 self.dependencies = value;
134 self
135 }
136
137 pub fn register_dependency_tokens(mut self, enable: bool) -> Self {
138 self.register_dependency_tokens = enable;
139 self
140 }
141
142 pub async fn build(self) -> Result<PowerElementContext> {
143 let (lessor, lessor_server_end) = create_proxy::<fbroker::LessorMarker>();
144 let (element_control, element_control_server_end) =
145 create_proxy::<fbroker::ElementControlMarker>();
146 self.topology
147 .add_element(fbroker::ElementSchema {
148 element_name: Some(self.element_name.into()),
149 initial_current_level: Some(self.initial_current_level),
150 valid_levels: Some(self.valid_levels.to_vec()),
151 dependencies: Some(self.dependencies),
152 lessor_channel: Some(lessor_server_end),
153 element_control: Some(element_control_server_end),
154 element_runner: Some(self.element_runner_client),
155 ..Default::default()
156 })
157 .await?
158 .map_err(|d| anyhow::anyhow!("{d:?}"))?;
159
160 let assertive_dependency_token = match self.register_dependency_tokens {
161 true => {
162 let token = fbroker::DependencyToken::create();
163 let _ = element_control
164 .register_dependency_token(
165 token
166 .duplicate_handle(Rights::SAME_RIGHTS)
167 .expect("failed to duplicate token"),
168 )
169 .await?
170 .expect("register assertive dependency token");
171 Some(token)
172 }
173 false => None,
174 };
175
176 Ok(PowerElementContext {
177 element_control,
178 lessor,
179 assertive_dependency_token,
180 name: self.element_name.to_string(),
181 initial_level: self.initial_current_level,
182 })
183 }
184}
185
186pub struct LeaseDependency {
189 pub requires_token: fbroker::DependencyToken,
190 pub requires_level_by_preference: Vec<fbroker::PowerLevel>,
191}
192
193pub struct LeaseHelper {
198 lessor: fbroker::LessorProxy,
199 _run_task: fasync::Task<()>,
200}
201
202pub struct Lease {
203 pub control_proxy: fbroker::LeaseControlProxy,
206
207 _helper: Arc<LeaseHelper>,
210}
211
212impl Lease {
213 pub async fn wait_until_satisfied(&self) -> Result<(), fidl::Error> {
214 let mut status = fbroker::LeaseStatus::Unknown;
215 loop {
216 match self.control_proxy.watch_status(status).await? {
217 fbroker::LeaseStatus::Satisfied => break Ok(()),
218 new_status @ _ => status = new_status,
219 }
220 }
221 }
222}
223
224impl LeaseHelper {
225 pub async fn new<'a>(
228 topology: &'a fbroker::TopologyProxy,
229 name: &'a str,
230 lease_dependencies: Vec<LeaseDependency>,
231 ) -> Result<Arc<Self>> {
232 let level_dependencies = lease_dependencies
233 .into_iter()
234 .map(|d| fbroker::LevelDependency {
235 dependent_level: BINARY_POWER_LEVELS[1],
236 requires_token: d.requires_token,
237 requires_level_by_preference: d.requires_level_by_preference,
238 })
239 .collect();
240
241 let (element_runner_client, element_runner) =
242 create_endpoints::<fbroker::ElementRunnerMarker>();
243 let element_context = PowerElementContext::builder(
244 topology,
245 name,
246 &BINARY_POWER_LEVELS,
247 element_runner_client,
248 )
249 .dependencies(level_dependencies)
250 .initial_current_level(BINARY_POWER_LEVELS[0])
251 .build()
252 .await?;
253
254 let lessor = element_context.lessor.clone();
255
256 let _run_task = fasync::Task::local(async move {
257 element_context.run(element_runner, None , None).await;
258 });
259
260 Ok(Arc::new(Self { lessor, _run_task }))
261 }
262
263 pub async fn create_lease_and_wait_until_satisfied(self: &Arc<Self>) -> Result<Lease> {
266 let lease = self.create_lease().await?;
267 lease.wait_until_satisfied().await?;
268 Ok(lease)
269 }
270
271 pub async fn create_lease(self: &Arc<Self>) -> Result<Lease> {
272 let lease = self
273 .lessor
274 .lease(BINARY_POWER_LEVELS[1])
275 .await?
276 .map_err(|e| anyhow!("PowerBroker::LeaseError({e:?})"))?;
277 Ok(Lease { control_proxy: lease.into_proxy(), _helper: self.clone() })
278 }
279}
280
281#[cfg(test)]
284mod tests {
285 use super::*;
286 use diagnostics_assertions::assert_data_tree;
287 use fidl::endpoints::ClientEnd;
288 use fuchsia_async as fasync;
289 use futures::channel::mpsc;
290 use futures::{FutureExt, StreamExt};
291 use std::cell::RefCell;
292 use std::rc::Rc;
293
294 fn drive_element_runner(
295 element_runner: ClientEnd<fbroker::ElementRunnerMarker>,
296 required_power_levels: Vec<fbroker::PowerLevel>,
297 ) {
298 let proxy = element_runner.into_proxy();
299 fasync::Task::local(async move {
300 for level in required_power_levels.into_iter().rev() {
301 let _ = proxy.set_level(level).await;
302 }
303 })
304 .detach();
305 }
306
307 #[fuchsia::test]
308 async fn power_element_context_run_passes_required_level_to_update_fn() -> Result<()> {
309 let (tx, mut rx) = mpsc::channel(5);
310
311 let (element_control, _element_control_stream) =
312 fidl::endpoints::create_proxy_and_stream::<fbroker::ElementControlMarker>();
313 let (lessor, _lessor_stream) =
314 fidl::endpoints::create_proxy_and_stream::<fbroker::LessorMarker>();
315 let (element_runner_client, element_runner) =
316 create_endpoints::<fbroker::ElementRunnerMarker>();
317 drive_element_runner(element_runner_client, vec![1, 2]);
318 let power_element = PowerElementContext {
319 element_control,
320 lessor,
321 assertive_dependency_token: Some(fbroker::DependencyToken::create()),
322 name: "test_element".to_string(),
323 initial_level: 0,
324 };
325
326 power_element
327 .run(
328 element_runner,
329 None,
330 Some(Box::new(|power_level| {
331 let mut tx = tx.clone();
332 async move {
333 tx.start_send(power_level).unwrap();
334 }
335 .boxed_local()
336 })),
337 )
338 .await;
339
340 assert_eq!(2, rx.next().await.unwrap());
341 assert_eq!(1, rx.next().await.unwrap());
342 Ok(())
343 }
344
345 #[fuchsia::test]
346 async fn power_element_context_run_skips_update_on_same_level() -> Result<()> {
347 let (tx, mut rx) = mpsc::channel(5);
348 let initial_level = 5;
349
350 let (element_control, _element_control_stream) =
351 fidl::endpoints::create_proxy_and_stream::<fbroker::ElementControlMarker>();
352 let (lessor, _lessor_stream) =
353 fidl::endpoints::create_proxy_and_stream::<fbroker::LessorMarker>();
354 let (element_runner_client, element_runner) =
355 create_endpoints::<fbroker::ElementRunnerMarker>();
356 drive_element_runner(element_runner_client, vec![3, 1, 1, 2, 2, initial_level]);
357
358 let power_element = PowerElementContext {
359 element_control,
360 lessor,
361 assertive_dependency_token: Some(fbroker::DependencyToken::create()),
362 name: "test_element".to_string(),
363 initial_level,
364 };
365
366 power_element
367 .run(
368 element_runner,
369 None,
370 Some(Box::new(|power_level| {
371 let mut tx = tx.clone();
372 async move {
373 tx.start_send(power_level).unwrap();
374 }
375 .boxed_local()
376 })),
377 )
378 .await;
379
380 assert_eq!(2, rx.next().await.unwrap());
381 assert_eq!(1, rx.next().await.unwrap());
382 assert_eq!(3, rx.next().await.unwrap());
383 Ok(())
384 }
385
386 #[fuchsia::test]
387 async fn power_element_context_run_updates_inspect_node() -> Result<()> {
388 let inspector = fuchsia_inspect::Inspector::default();
389 let (mut tx, rx) = mpsc::channel(5);
390 let (tx2, mut rx2) = mpsc::channel(5);
391 let rx = Rc::new(RefCell::new(rx));
392
393 let (element_control, _element_control_stream) =
394 fidl::endpoints::create_proxy_and_stream::<fbroker::ElementControlMarker>();
395 let (lessor, _lessor_stream) =
396 fidl::endpoints::create_proxy_and_stream::<fbroker::LessorMarker>();
397 let (element_runner_client, element_runner) =
398 create_endpoints::<fbroker::ElementRunnerMarker>();
399 drive_element_runner(element_runner_client, vec![1, 4, 0, 3]);
400
401 let power_element = PowerElementContext {
402 element_control,
403 lessor,
404 assertive_dependency_token: Some(fbroker::DependencyToken::create()),
405 name: "test_element".to_string(),
406 initial_level: 0,
407 };
408
409 let root = inspector.root().clone_weak();
410 fasync::Task::local(async move {
411 power_element
412 .run(
413 element_runner,
414 Some(root),
415 Some(Box::new(|_| {
416 let rx = rx.clone();
417 let mut tx2 = tx2.clone();
418 async move {
419 tx2.start_send(()).unwrap();
420 rx.borrow_mut().next().await.unwrap();
421 }
422 .boxed_local()
423 })),
424 )
425 .await;
426 })
427 .detach();
428
429 rx2.next().await.unwrap();
431 tx.start_send(()).unwrap();
432
433 rx2.next().await.unwrap();
436 assert_data_tree!(inspector, root: {
437 power_level: 3u64
438 });
439 tx.start_send(()).unwrap();
440
441 rx2.next().await.unwrap();
442 assert_data_tree!(inspector, root: {
443 power_level: 0u64
444 });
445 tx.start_send(()).unwrap();
446
447 rx2.next().await.unwrap();
448 assert_data_tree!(inspector, root: {
449 power_level: 4u64
450 });
451 Ok(())
452 }
453
454 #[fuchsia::test]
456 async fn test_lease_helper() -> Result<()> {
457 let (topology, topology_server) = create_proxy::<fbroker::TopologyMarker>();
458 let mut topology_stream = topology_server.into_stream();
459 let topology_task: fasync::Task<Result<()>> = fasync::Task::local(async move {
460 log::debug!("Waiting for add_element_request...");
461 let add_element_request = topology_stream.next().await.unwrap()?;
462 let mut element_control;
463 let lessor;
464 match add_element_request {
465 fbroker::TopologyRequest::AddElement { payload, responder } => {
466 element_control =
467 payload.element_control.expect("element_control not set").into_stream();
468 lessor = payload.lessor_channel.expect("lessor_channel not set");
469 responder.send(Ok(()))?
470 }
471 request => {
472 return Err(anyhow!("Unexpected method called: {request:?}"));
473 }
474 }
475 let element_control_request = element_control.next().await.unwrap()?;
476 match element_control_request {
477 fbroker::ElementControlRequest::RegisterDependencyToken { responder, .. } => {
478 responder.send(Ok(()))?
479 }
480 request => {
481 return Err(anyhow!("Unexpected method called: {request:?}"));
482 }
483 }
484 let (lease_control, lease_control_server) =
485 create_endpoints::<fbroker::LeaseControlMarker>();
486 log::debug!("Waiting for lease_request...");
487 let lease_request = lessor.into_stream().next().await.unwrap()?;
488 match lease_request {
489 fbroker::LessorRequest::Lease { responder, .. } => {
490 responder.send(Ok(lease_control))?
491 }
492 request => {
493 return Err(anyhow!("Unexpected method called: {request:?}"));
494 }
495 }
496 let mut lease_control_stream = lease_control_server.into_stream();
497 log::debug!("Waiting for lease_control_request...");
498 let lease_control_request = lease_control_stream.next().await.unwrap()?;
499 match lease_control_request {
500 fbroker::LeaseControlRequest::WatchStatus { responder, .. } => {
501 responder.send(fbroker::LeaseStatus::Satisfied)?
502 }
503 request => {
504 return Err(anyhow!("Unexpected method called: {request:?}"));
505 }
506 }
507 log::debug!("Waiting for lease_control to be closed...");
508 assert!(lease_control_stream.next().await.is_none());
509 log::debug!("Waiting for element_control to be closed...");
510 assert!(element_control.into_stream().next().await.is_none());
511 Ok(())
512 });
513
514 log::debug!("Creating LeaseHelper...");
515 let helper = LeaseHelper::new(&topology, "Lease", vec![]).await?;
516 log::debug!("create_lease_and_wait_until_satisfied...");
517 let lease = helper.create_lease_and_wait_until_satisfied().await?;
518
519 log::debug!("Dropping lease...");
520 drop(lease);
521 log::debug!("Dropping LeaseHelper...");
522 drop(helper);
523
524 log::debug!("Waiting for topology_task to complete...");
525 assert!(topology_task.await.is_ok());
526 Ok(())
527 }
528}