lease_management/sequence_server.rs
1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use async_trait::async_trait;
5use futures::lock::Mutex;
6use futures::StreamExt;
7use log::warn;
8use std::sync::Arc;
9use {
10 fidl_fuchsia_example_power as fexample, fidl_fuchsia_power_system as fsag,
11 fuchsia_trace as ftrace,
12};
13
14const TRACE_NAME_FLUSH: &std::ffi::CStr = c"flush-count-dirty";
15
16/// You probably don't want to create one of these directly, but instead use
17/// |SequenceServer| to create one for you and then get a reference to it from
18/// |SequenceServer.get_message_tracker|.
19///
20/// Server components can use this to help manage sending batons to clients
21/// at appropriate times. Server components should call |message_sent| when
22/// they send a message to a client. Servers should call |set_requester|
23/// whenever server receives a hanging-GET for a baton from a client.
24pub struct MessageSendTracker {
25 sent_messages: u64,
26 // The value of |sent_messages| when we last passed a baton
27 last_baton_pass: u64,
28 baton_request: Option<fexample::MessageSourceReceiveBatonResponder>,
29 sag: fsag::ActivityGovernorProxy,
30 baton: Option<zx::EventPair>,
31 resumed: bool,
32 terminated: bool,
33}
34
35#[derive(Debug)]
36pub enum MessageTrackerError {
37 Terminated,
38}
39
40impl MessageSendTracker {
41 pub async fn new(sag: fsag::ActivityGovernorProxy) -> Self {
42 let me = Self {
43 sent_messages: 0,
44 last_baton_pass: 0,
45 baton_request: None,
46 sag,
47 baton: None,
48 resumed: false,
49 terminated: false,
50 };
51 me
52 }
53
54 /// Inform the tracker a message was sent, equivalent to |messages_sent(1)|.
55 /// If the system is suspended, a wake lease is taken to guarantee the
56 /// message can be processed. This is the behavior we probably want if we
57 /// this message came from a waking interrupt. This means after this
58 /// function returns, the caller can safely ack an interrupt related to
59 /// this event.
60 pub async fn message_sent(&mut self) -> Result<(), MessageTrackerError> {
61 self.messages_sent(1).await
62 }
63
64 /// Inform the tracker that |message_count| messages were sent. If the
65 /// system is suspended, a wake lease is taken to guarantee the message
66 /// can be processed. This is the behavior we probably want if we this
67 /// message came from a waking interrupt. This means after this function
68 /// returns, the caller can safely ack an interrupt related to this event.
69 pub async fn messages_sent(&mut self, message_count: u64) -> Result<(), MessageTrackerError> {
70 ftrace::instant!(
71 crate::TRACE_CATEGORY,
72 c"messages_sent",
73 ftrace::Scope::Process,
74 "count" => message_count
75 );
76
77 if self.terminated {
78 return Err(MessageTrackerError::Terminated);
79 }
80
81 self.sent_messages += message_count;
82
83 // If we're not resumed we want to resume the system so we can make
84 // sure the message will be processed.
85 if !self.resumed {
86 ftrace::instant!(
87 crate::TRACE_CATEGORY,
88 c"send-during-suspension",
89 ftrace::Scope::Process
90 );
91 self.flush().await;
92 // Flush acquires a wake lease, which guarantees we are or will be
93 // resumed at some point, mark the state as such.
94 self.resumed();
95 }
96 Ok(())
97 }
98
99 /// Deposits a request for a baton that a client made. This is a
100 /// hanging-GET-style communication pattern and so the client receives a
101 /// response the next time a flush is triggered. If a flush was triggered
102 /// before |set_requester| was called, the baton is sent now based on the
103 /// current message index.
104 pub fn set_requester(&mut self, requester: fexample::MessageSourceReceiveBatonResponder) {
105 {
106 // Drop any previous requester
107 let _previous_requester = self.baton_request.take();
108 }
109
110 if let Some(baton) = self.baton.take() {
111 let id = self.sent_messages;
112 self.send_baton(id, requester, baton);
113 } else {
114 self.baton_request = Some(requester);
115 }
116 }
117
118 pub fn suspended(&mut self) {
119 self.resumed = false;
120 }
121
122 pub fn resumed(&mut self) {
123 self.resumed = true;
124 }
125
126 fn terminated(&mut self) {
127 self.terminated = true;
128 }
129
130 pub fn get_message_count(&self) -> u64 {
131 self.sent_messages
132 }
133
134 fn send_baton(
135 &mut self,
136 msg_id: u64,
137 request: fexample::MessageSourceReceiveBatonResponder,
138 baton: zx::EventPair,
139 ) {
140 request
141 .send(fexample::LeaseBaton {
142 lease: Some(baton),
143 msg_index: Some(msg_id),
144 ..Default::default()
145 })
146 .expect("send failed");
147
148 ftrace::duration_end!(crate::TRACE_CATEGORY, TRACE_NAME_FLUSH, "msg_id" => msg_id);
149 self.last_baton_pass = msg_id;
150 }
151
152 /// Triggers a response the previous request passed to |set_requester|, if
153 /// any, when |message(s)_sent| has been called since the previous time a
154 /// response was set to a requester. If there is no waiting request for
155 /// a baton, the baton is created, but not passed. It will be passed when
156 /// |seq_requester| is called next.
157 async fn flush(&mut self) {
158 if self.last_baton_pass < self.sent_messages {
159 let curr_offset = self.sent_messages;
160
161 ftrace::duration_begin!(
162 crate::TRACE_CATEGORY,
163 TRACE_NAME_FLUSH,
164 "msg_id" => curr_offset,
165 "baton_delta" => self.sent_messages - self.last_baton_pass
166 );
167
168 let baton = self
169 .sag
170 .acquire_wake_lease("optimistic-lease-baton")
171 .await
172 .expect("FIDL failed")
173 .expect("SAG returned error");
174
175 // If there is a hanging-GET for a baton, return the send the baton
176 // immediately, otherwise store it for later.
177 if let Some(req) = self.baton_request.take() {
178 self.send_baton(curr_offset, req, baton);
179 } else {
180 self.baton = Some(baton);
181 return;
182 }
183 }
184 }
185}
186
187/// Server components should use class to guarantee clients see all messages
188/// sent by the server prior to system suspension. After creating a
189/// |SequenceServer| server components should call |manage| to start this
190/// management.
191///
192/// Then server components should use the |MessageSendTracker| returned from
193/// |get_message_tracker| to record when messages are sent and deposit
194/// hanging-GET requests for batons. Batons are sent to the client whenever the
195/// system starts to suspend (as indicated by a SuspendStarted callback from
196/// SystemActivityGovernor) *AND* messages count of sent messages is greater
197/// than the last time a baton was sent to a the client *AND* a hanging-GET
198/// request is pending.
199pub struct SequenceServer {
200 baton_sender: Arc<Mutex<MessageSendTracker>>,
201 flusher: Option<crate::flush_trigger::FlushTrigger>,
202 sag: fsag::ActivityGovernorProxy,
203}
204
205#[async_trait]
206impl crate::flush_trigger::FlushListener for SequenceServer {
207 async fn flush(&self) {
208 self.baton_sender.lock().await.flush().await;
209 }
210}
211
212impl SequenceServer {
213 /// Creates a SequenceServer, but does *not* kick off its logic. |manage|
214 /// MUST be called to start monitoring for suspend and managing baton hand-
215 /// offs.
216 pub async fn new(sag: fsag::ActivityGovernorProxy) -> Self {
217 let flusher = Some(crate::flush_trigger::FlushTrigger::new(sag.clone()));
218 let baton_sender = Arc::new(Mutex::new(MessageSendTracker::new(sag.clone()).await));
219 Self { flusher, baton_sender, sag }
220 }
221
222 /// Returns a future which manages baton passing and a reference to the
223 /// |MessageSendTracker| clients use to report message sends. The returned
224 /// future *must* be polled for as long as batons need to be delivered. The
225 /// future returns, yielding the |SequenceServer| if the channel to
226 /// ActivityGovernor passed to |new| closes.
227 ///
228 /// To stop managing batons, simply drop the future. The
229 /// |MessageSendTracker| should not be used after the future returns.
230 pub fn manage(
231 self,
232 ) -> (Arc<Mutex<MessageSendTracker>>, impl futures::Future<Output = Result<Self, fidl::Error>>)
233 {
234 let tracker = self.baton_sender.clone();
235 let fut = async move {
236 let mut sequence_server = self;
237 if let None = sequence_server.flusher {
238 warn!("No flusher available, aborting manage");
239 return Ok(sequence_server);
240 }
241
242 // Take the flush trigger because we can't borrow sequence_server
243 // twice to run the two futures.
244 let flusher = sequence_server.flusher.take().unwrap();
245
246 // Await the two futures for what we expect to be effectively
247 // forever.
248 let results = futures::future::join(
249 sequence_server.watch_system_state(),
250 flusher.run(&sequence_server),
251 )
252 .await;
253
254 // Both futures return errors only if we can't talk to SAG so probably if
255 // one has an error the other will have to same or would have the same
256 // error soon.
257 if let Err(e) = results.1 {
258 return Err(e);
259 }
260
261 if let Err(e) = results.0 {
262 return Err(e);
263 }
264
265 sequence_server.flusher = Some(flusher);
266 sequence_server.baton_sender.lock().await.terminated();
267 Ok(sequence_server)
268 };
269 (tracker, fut)
270 }
271
272 /// Watch the suspend/resume state of the system and
273 async fn watch_system_state(&self) -> Result<(), fidl::Error> {
274 let (client, server) =
275 fidl::endpoints::create_endpoints::<fsag::ActivityGovernorListenerMarker>();
276
277 self.sag
278 .register_listener(fsag::ActivityGovernorRegisterListenerRequest {
279 listener: Some(client),
280 ..Default::default()
281 })
282 .await?;
283
284 let mut request_stream = server.into_stream();
285 while let Some(req) = request_stream.next().await {
286 match req {
287 Ok(fsag::ActivityGovernorListenerRequest::OnSuspendStarted { responder }) => {
288 ftrace::instant!(crate::TRACE_CATEGORY, c"suspended", ftrace::Scope::Process);
289 self.baton_sender.lock().await.suspended();
290 let _ = responder.send();
291 }
292 Ok(fsag::ActivityGovernorListenerRequest::OnResume { responder }) => {
293 ftrace::instant!(crate::TRACE_CATEGORY, c"resumed", ftrace::Scope::Process);
294 self.baton_sender.lock().await.resumed();
295 let _ = responder.send();
296 }
297 Ok(fsag::ActivityGovernorListenerRequest::_UnknownMethod { .. }) => {
298 warn!("unrecognized listener method, ignoring");
299 }
300 Err(_) => {
301 warn!("Error receiving next items from stream");
302 }
303 }
304 }
305 Ok(())
306 }
307}