lease_management/
sequence_server.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use async_trait::async_trait;
5use futures::lock::Mutex;
6use futures::StreamExt;
7use log::warn;
8use std::sync::Arc;
9use {
10    fidl_fuchsia_example_power as fexample, fidl_fuchsia_power_system as fsag,
11    fuchsia_trace as ftrace,
12};
13
14const TRACE_NAME_FLUSH: &std::ffi::CStr = c"flush-count-dirty";
15
16/// You probably don't want to create one of these directly, but instead use
17/// |SequenceServer| to create one for you and then get a reference to it from
18/// |SequenceServer.get_message_tracker|.
19///
20/// Server components can use this to help manage sending batons to clients
21/// at appropriate times. Server components should call |message_sent| when
22/// they send a message to a client. Servers should call |set_requester|
23/// whenever server receives a hanging-GET for a baton from a client.
24pub struct MessageSendTracker {
25    sent_messages: u64,
26    // The value of |sent_messages| when we last passed a baton
27    last_baton_pass: u64,
28    baton_request: Option<fexample::MessageSourceReceiveBatonResponder>,
29    sag: fsag::ActivityGovernorProxy,
30    baton: Option<zx::EventPair>,
31    resumed: bool,
32    terminated: bool,
33}
34
35#[derive(Debug)]
36pub enum MessageTrackerError {
37    Terminated,
38}
39
40impl MessageSendTracker {
41    pub async fn new(sag: fsag::ActivityGovernorProxy) -> Self {
42        let me = Self {
43            sent_messages: 0,
44            last_baton_pass: 0,
45            baton_request: None,
46            sag,
47            baton: None,
48            resumed: false,
49            terminated: false,
50        };
51        me
52    }
53
54    /// Inform the tracker a message was sent, equivalent to |messages_sent(1)|.
55    /// If the system is suspended, a wake lease is taken to guarantee the
56    /// message can be processed. This is the behavior we probably want if we
57    /// this message came from a waking interrupt. This means after this
58    /// function returns, the caller can safely ack an interrupt related to
59    /// this event.
60    pub async fn message_sent(&mut self) -> Result<(), MessageTrackerError> {
61        self.messages_sent(1).await
62    }
63
64    /// Inform the tracker that |message_count| messages were sent. If the
65    /// system is suspended, a wake lease is taken to guarantee the message
66    /// can be processed. This is the behavior we probably want if we this
67    /// message came from a waking interrupt. This means after this function
68    /// returns, the caller can safely ack an interrupt related to this event.
69    pub async fn messages_sent(&mut self, message_count: u64) -> Result<(), MessageTrackerError> {
70        ftrace::instant!(
71            crate::TRACE_CATEGORY,
72            c"messages_sent",
73            ftrace::Scope::Process,
74            "count" => message_count
75        );
76
77        if self.terminated {
78            return Err(MessageTrackerError::Terminated);
79        }
80
81        self.sent_messages += message_count;
82
83        // If we're not resumed we want to resume the system so we can make
84        // sure the message will be processed.
85        if !self.resumed {
86            ftrace::instant!(
87                crate::TRACE_CATEGORY,
88                c"send-during-suspension",
89                ftrace::Scope::Process
90            );
91            self.flush().await;
92            // Flush acquires a wake lease, which guarantees we are or will be
93            // resumed at some point, mark the state as such.
94            self.resumed();
95        }
96        Ok(())
97    }
98
99    /// Deposits a request for a baton that a client made. This is a
100    /// hanging-GET-style communication pattern and so the client receives a
101    /// response the next time a flush is triggered. If a flush was triggered
102    /// before |set_requester| was called, the baton is sent now based on the
103    /// current message index.
104    pub fn set_requester(&mut self, requester: fexample::MessageSourceReceiveBatonResponder) {
105        {
106            // Drop any previous requester
107            let _previous_requester = self.baton_request.take();
108        }
109
110        if let Some(baton) = self.baton.take() {
111            let id = self.sent_messages;
112            self.send_baton(id, requester, baton);
113        } else {
114            self.baton_request = Some(requester);
115        }
116    }
117
118    pub fn suspended(&mut self) {
119        self.resumed = false;
120    }
121
122    pub fn resumed(&mut self) {
123        self.resumed = true;
124    }
125
126    fn terminated(&mut self) {
127        self.terminated = true;
128    }
129
130    pub fn get_message_count(&self) -> u64 {
131        self.sent_messages
132    }
133
134    fn send_baton(
135        &mut self,
136        msg_id: u64,
137        request: fexample::MessageSourceReceiveBatonResponder,
138        baton: zx::EventPair,
139    ) {
140        request
141            .send(fexample::LeaseBaton {
142                lease: Some(baton),
143                msg_index: Some(msg_id),
144                ..Default::default()
145            })
146            .expect("send failed");
147
148        ftrace::duration_end!(crate::TRACE_CATEGORY, TRACE_NAME_FLUSH, "msg_id" => msg_id);
149        self.last_baton_pass = msg_id;
150    }
151
152    /// Triggers a response the previous request passed to |set_requester|, if
153    /// any, when |message(s)_sent| has been called since the previous time a
154    /// response was set to a requester. If there is no waiting request for
155    /// a baton, the baton is created, but not passed. It will be passed when
156    /// |seq_requester| is called next.
157    async fn flush(&mut self) {
158        if self.last_baton_pass < self.sent_messages {
159            let curr_offset = self.sent_messages;
160
161            ftrace::duration_begin!(
162                crate::TRACE_CATEGORY,
163                TRACE_NAME_FLUSH,
164                "msg_id" => curr_offset,
165                "baton_delta" => self.sent_messages - self.last_baton_pass
166            );
167
168            let baton = self
169                .sag
170                .acquire_wake_lease("optimistic-lease-baton")
171                .await
172                .expect("FIDL failed")
173                .expect("SAG returned error");
174
175            // If there is a hanging-GET for a baton, return the send the baton
176            // immediately, otherwise store it for later.
177            if let Some(req) = self.baton_request.take() {
178                self.send_baton(curr_offset, req, baton);
179            } else {
180                self.baton = Some(baton);
181                return;
182            }
183        }
184    }
185}
186
187/// Server components should use class to guarantee clients see all messages
188/// sent by the server prior to system suspension. After creating a
189/// |SequenceServer| server components should call |manage| to start this
190/// management.
191///
192/// Then server components should use the |MessageSendTracker| returned from
193/// |get_message_tracker| to record when messages are sent and deposit
194/// hanging-GET requests for batons. Batons are sent to the client whenever the
195/// system starts to suspend (as indicated by a SuspendStarted callback from
196/// SystemActivityGovernor) *AND* messages count of sent messages is greater
197/// than the last time a baton was sent to a the client *AND* a hanging-GET
198/// request is pending.
199pub struct SequenceServer {
200    baton_sender: Arc<Mutex<MessageSendTracker>>,
201    flusher: Option<crate::flush_trigger::FlushTrigger>,
202    sag: fsag::ActivityGovernorProxy,
203}
204
205#[async_trait]
206impl crate::flush_trigger::FlushListener for SequenceServer {
207    async fn flush(&self) {
208        self.baton_sender.lock().await.flush().await;
209    }
210}
211
212impl SequenceServer {
213    /// Creates a SequenceServer, but does *not* kick off its logic. |manage|
214    /// MUST be called to start monitoring for suspend and managing baton hand-
215    /// offs.
216    pub async fn new(sag: fsag::ActivityGovernorProxy) -> Self {
217        let flusher = Some(crate::flush_trigger::FlushTrigger::new(sag.clone()));
218        let baton_sender = Arc::new(Mutex::new(MessageSendTracker::new(sag.clone()).await));
219        Self { flusher, baton_sender, sag }
220    }
221
222    /// Returns a future which manages baton passing and a reference to the
223    /// |MessageSendTracker| clients use to report message sends. The returned
224    /// future *must* be polled for as long as batons need to be delivered. The
225    /// future returns, yielding the |SequenceServer| if the channel to
226    /// ActivityGovernor passed to |new| closes.
227    ///
228    /// To stop managing batons, simply drop the future. The
229    /// |MessageSendTracker| should not be used after the future returns.
230    pub fn manage(
231        self,
232    ) -> (Arc<Mutex<MessageSendTracker>>, impl futures::Future<Output = Result<Self, fidl::Error>>)
233    {
234        let tracker = self.baton_sender.clone();
235        let fut = async move {
236            let mut sequence_server = self;
237            if let None = sequence_server.flusher {
238                warn!("No flusher available, aborting manage");
239                return Ok(sequence_server);
240            }
241
242            // Take the flush trigger because we can't borrow sequence_server
243            // twice to run the two futures.
244            let flusher = sequence_server.flusher.take().unwrap();
245
246            // Await the two futures for what we expect to be effectively
247            // forever.
248            let results = futures::future::join(
249                sequence_server.watch_system_state(),
250                flusher.run(&sequence_server),
251            )
252            .await;
253
254            // Both futures return errors only if we can't talk to SAG so probably if
255            // one has an error the other will have to same or would have the same
256            // error soon.
257            if let Err(e) = results.1 {
258                return Err(e);
259            }
260
261            if let Err(e) = results.0 {
262                return Err(e);
263            }
264
265            sequence_server.flusher = Some(flusher);
266            sequence_server.baton_sender.lock().await.terminated();
267            Ok(sequence_server)
268        };
269        (tracker, fut)
270    }
271
272    /// Watch the suspend/resume state of the system and
273    async fn watch_system_state(&self) -> Result<(), fidl::Error> {
274        let (client, server) =
275            fidl::endpoints::create_endpoints::<fsag::ActivityGovernorListenerMarker>();
276
277        self.sag
278            .register_listener(fsag::ActivityGovernorRegisterListenerRequest {
279                listener: Some(client),
280                ..Default::default()
281            })
282            .await?;
283
284        let mut request_stream = server.into_stream();
285        while let Some(req) = request_stream.next().await {
286            match req {
287                Ok(fsag::ActivityGovernorListenerRequest::OnSuspendStarted { responder }) => {
288                    ftrace::instant!(crate::TRACE_CATEGORY, c"suspended", ftrace::Scope::Process);
289                    self.baton_sender.lock().await.suspended();
290                    let _ = responder.send();
291                }
292                Ok(fsag::ActivityGovernorListenerRequest::OnResume { responder }) => {
293                    ftrace::instant!(crate::TRACE_CATEGORY, c"resumed", ftrace::Scope::Process);
294                    self.baton_sender.lock().await.resumed();
295                    let _ = responder.send();
296                }
297                Ok(fsag::ActivityGovernorListenerRequest::_UnknownMethod { .. }) => {
298                    warn!("unrecognized listener method, ignoring");
299                }
300                Err(_) => {
301                    warn!("Error receiving next items from stream");
302                }
303            }
304        }
305        Ok(())
306    }
307}