lease_management/
sequence_client.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use futures::lock::Mutex;
5use {fidl_fuchsia_example_power as fexample, fuchsia_trace as ftrace};
6
7/// Client components should use |SequenceClient| to automatically manage
8/// interaction with server components. Clients should use |process_message(s)|
9/// to tell |SequenceClient| when they've processed a message.
10/// |SequenceClient| will then automatically discard batons at the appropriate
11/// times.
12pub struct SequenceClient {
13    baton_source: fexample::MessageSourceProxy,
14    baton: Mutex<Option<LeaseBaton>>,
15    msg_index: Mutex<u64>,
16}
17
18pub enum BatonError {
19    InvalidBatonMessage,
20    Internal,
21}
22
23pub struct LeaseBaton {
24    pub msg_index: u64,
25    pub lease: zx::EventPair,
26}
27
28impl TryFrom<fexample::LeaseBaton> for LeaseBaton {
29    type Error = BatonError;
30
31    fn try_from(value: fexample::LeaseBaton) -> Result<Self, Self::Error> {
32        match (value.lease, value.msg_index) {
33            (Some(lease), Some(msg_index)) => Ok(Self { msg_index, lease }),
34            _ => Err(BatonError::InvalidBatonMessage),
35        }
36    }
37}
38
39impl SequenceClient {
40    /// Creates a new |SequenceClient|, but does not start its management.
41    /// Clients should call |run| to start baton management.
42    pub fn new(message_source: fexample::MessageSourceProxy) -> Self {
43        Self { baton_source: message_source, baton: Mutex::new(None), msg_index: Mutex::new(0) }
44    }
45
46    /// Start baton management. This function will not return until the
47    /// |MessageSourceProxy| passed to |new| closes, returns an error when
48    /// read, or receives an invalid baton message.
49    pub async fn run(&self) -> Result<(), BatonError> {
50        loop {
51            let baton_result = self.baton_source.receive_baton().await;
52
53            if let Err(e) = baton_result {
54                if e.is_closed() {
55                    break;
56                } else {
57                    return Err(BatonError::Internal);
58                }
59            }
60
61            ftrace::instant!(crate::TRACE_CATEGORY, c"receive-baton", ftrace::Scope::Process);
62            let baton = baton_result.unwrap();
63            {
64                let new_baton: LeaseBaton = baton.try_into()?;
65
66                let current_index = self.msg_index.lock().await;
67                if *current_index < new_baton.msg_index {
68                    let mut current_baton = self.baton.lock().await;
69                    *current_baton = Some(new_baton);
70                }
71            }
72        }
73        Ok(())
74    }
75
76    /// Tell |SequenceClient| that a message was processed. If a baton is held
77    /// corresponding to this message, it is returned. Likely the only valid
78    /// reason for the caller to hold on to the baton is to pass it to it's
79    /// client(s).
80    pub async fn process_message(&self) -> Option<LeaseBaton> {
81        self.process_messages(1).await
82    }
83
84    /// Tell |SequenceClient| that |message_count| messages were processed. If
85    /// a baton is held corresponding to one of the messages, it is returned.
86    /// Likely the only valid reason for the caller to hold on to the baton is
87    /// to pass the baton to it's client(s).
88    pub async fn process_messages(&self, message_count: u64) -> Option<LeaseBaton> {
89        ftrace::duration!(crate::TRACE_CATEGORY, c"process-message", "count" => message_count);
90        let mut current_index = self.msg_index.lock().await;
91        *current_index += message_count;
92        let mut baton_ref = self.baton.lock().await;
93
94        // If we have a baton check to see if we should return it to the caller
95        if let Some(baton) = baton_ref.take() {
96            let baton_index = baton.msg_index;
97
98            // We've seen the message this baton corresponds to, so take it and
99            // return to the caller so they can do with it as they will.
100            if baton_index <= *current_index {
101                ftrace::instant!(crate::TRACE_CATEGORY, c"dropping-baton", ftrace::Scope::Process);
102                return Some(baton);
103            }
104            *baton_ref = Some(baton);
105        }
106        None
107    }
108
109    pub async fn get_receieved_count(&self) -> u64 {
110        *self.msg_index.lock().await
111    }
112}