1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use futures::lock::Mutex;
5use {fidl_fuchsia_example_power as fexample, fuchsia_trace as ftrace};
67/// Client components should use |SequenceClient| to automatically manage
8/// interaction with server components. Clients should use |process_message(s)|
9/// to tell |SequenceClient| when they've processed a message.
10/// |SequenceClient| will then automatically discard batons at the appropriate
11/// times.
12pub struct SequenceClient {
13 baton_source: fexample::MessageSourceProxy,
14 baton: Mutex<Option<LeaseBaton>>,
15 msg_index: Mutex<u64>,
16}
1718pub enum BatonError {
19 InvalidBatonMessage,
20 Internal,
21}
2223pub struct LeaseBaton {
24pub msg_index: u64,
25pub lease: zx::EventPair,
26}
2728impl TryFrom<fexample::LeaseBaton> for LeaseBaton {
29type Error = BatonError;
3031fn try_from(value: fexample::LeaseBaton) -> Result<Self, Self::Error> {
32match (value.lease, value.msg_index) {
33 (Some(lease), Some(msg_index)) => Ok(Self { msg_index, lease }),
34_ => Err(BatonError::InvalidBatonMessage),
35 }
36 }
37}
3839impl SequenceClient {
40/// Creates a new |SequenceClient|, but does not start its management.
41 /// Clients should call |run| to start baton management.
42pub fn new(message_source: fexample::MessageSourceProxy) -> Self {
43Self { baton_source: message_source, baton: Mutex::new(None), msg_index: Mutex::new(0) }
44 }
4546/// Start baton management. This function will not return until the
47 /// |MessageSourceProxy| passed to |new| closes, returns an error when
48 /// read, or receives an invalid baton message.
49pub async fn run(&self) -> Result<(), BatonError> {
50loop {
51let baton_result = self.baton_source.receive_baton().await;
5253if let Err(e) = baton_result {
54if e.is_closed() {
55break;
56 } else {
57return Err(BatonError::Internal);
58 }
59 }
6061ftrace::instant!(crate::TRACE_CATEGORY, c"receive-baton", ftrace::Scope::Process);
62let baton = baton_result.unwrap();
63 {
64let new_baton: LeaseBaton = baton.try_into()?;
6566let current_index = self.msg_index.lock().await;
67if *current_index < new_baton.msg_index {
68let mut current_baton = self.baton.lock().await;
69*current_baton = Some(new_baton);
70 }
71 }
72 }
73Ok(())
74 }
7576/// Tell |SequenceClient| that a message was processed. If a baton is held
77 /// corresponding to this message, it is returned. Likely the only valid
78 /// reason for the caller to hold on to the baton is to pass it to it's
79 /// client(s).
80pub async fn process_message(&self) -> Option<LeaseBaton> {
81self.process_messages(1).await
82}
8384/// Tell |SequenceClient| that |message_count| messages were processed. If
85 /// a baton is held corresponding to one of the messages, it is returned.
86 /// Likely the only valid reason for the caller to hold on to the baton is
87 /// to pass the baton to it's client(s).
88pub async fn process_messages(&self, message_count: u64) -> Option<LeaseBaton> {
89ftrace::duration!(crate::TRACE_CATEGORY, c"process-message", "count" => message_count);
90let mut current_index = self.msg_index.lock().await;
91*current_index += message_count;
92let mut baton_ref = self.baton.lock().await;
9394// If we have a baton check to see if we should return it to the caller
95if let Some(baton) = baton_ref.take() {
96let baton_index = baton.msg_index;
9798// We've seen the message this baton corresponds to, so take it and
99 // return to the caller so they can do with it as they will.
100if baton_index <= *current_index {
101ftrace::instant!(crate::TRACE_CATEGORY, c"dropping-baton", ftrace::Scope::Process);
102return Some(baton);
103 }
104*baton_ref = Some(baton);
105 }
106None
107}
108109pub async fn get_receieved_count(&self) -> u64 {
110*self.msg_index.lock().await
111}
112}