fidl_fuchsia_pkg_rewrite_ext/
transaction.rs
1use crate::errors::EditTransactionError;
6use crate::rule::Rule;
7use fidl_fuchsia_pkg_rewrite::{EditTransactionProxy, EngineProxy};
8use std::future::Future;
9use zx_status as zx;
10
11const RETRY_ATTEMPTS: usize = 100;
12
13pub struct EditTransaction {
15 transaction: EditTransactionProxy,
16}
17
18impl EditTransaction {
19 pub fn reset_all(&self) -> Result<(), EditTransactionError> {
22 self.transaction.reset_all().map_err(EditTransactionError::Fidl)
23 }
24
25 pub async fn list_dynamic(&self) -> Result<Vec<Rule>, EditTransactionError> {
29 let (iter, iter_server_end) = fidl::endpoints::create_proxy();
30 self.transaction.list_dynamic(iter_server_end)?;
31
32 let mut rules = Vec::new();
33 loop {
34 let chunk = iter.next().await?;
35 if chunk.is_empty() {
36 break;
37 }
38
39 for rule in chunk {
40 rules.push(Rule::try_from(rule)?);
41 }
42 }
43
44 Ok(rules)
45 }
46
47 pub async fn add(&self, rule: Rule) -> Result<(), EditTransactionError> {
50 self.transaction
51 .add(&rule.into())
52 .await?
53 .map_err(|err| EditTransactionError::AddError(zx::Status::from_raw(err)))
54 }
55}
56
57pub async fn do_transaction<T, R>(engine: &EngineProxy, cb: T) -> Result<(), EditTransactionError>
64where
65 T: Fn(EditTransaction) -> R,
66 R: Future<Output = Result<EditTransaction, EditTransactionError>>,
67{
68 for _ in 0..RETRY_ATTEMPTS {
70 let (transaction, transaction_server_end) = fidl::endpoints::create_proxy();
71
72 let () = engine
73 .start_edit_transaction(transaction_server_end)
74 .map_err(EditTransactionError::Fidl)?;
75
76 let transaction = cb(EditTransaction { transaction }).await?;
77
78 let response =
79 transaction.transaction.commit().await.map_err(EditTransactionError::Fidl)?;
80
81 return match response.map_err(zx::Status::from_raw) {
83 Ok(()) => Ok(()),
84 Err(zx::Status::UNAVAILABLE) => {
85 continue;
86 }
87 Err(status) => Err(EditTransactionError::CommitError(status)),
88 };
89 }
90
91 Err(EditTransactionError::CommitError(zx::Status::UNAVAILABLE))
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97 use assert_matches::assert_matches;
98 use fidl::endpoints::create_proxy_and_stream;
99 use fidl_fuchsia_pkg_rewrite::{
100 EditTransactionRequest, EngineMarker, EngineRequest, RuleIteratorRequest,
101 };
102 use fuchsia_async as fasync;
103 use futures::TryStreamExt;
104 use std::sync::atomic::{AtomicUsize, Ordering};
105 use std::sync::{Arc, Mutex};
106
107 #[derive(Debug, PartialEq)]
108 enum Event {
109 ResetAll,
110 ListDynamic,
111 IteratorNext,
112 Add(Rule),
113 CommitFailed,
114 Commit,
115 }
116
117 struct Engine {
118 engine: EngineProxy,
119 events: Arc<Mutex<Vec<Event>>>,
120 }
121
122 macro_rules! rule {
123 ($host_match:expr => $host_replacement:expr,
124 $path_prefix_match:expr => $path_prefix_replacement:expr) => {
125 Rule::new($host_match, $host_replacement, $path_prefix_match, $path_prefix_replacement)
126 .unwrap()
127 };
128 }
129
130 impl Engine {
131 fn new() -> Self {
132 Self::with_fail_attempts(0, zx::Status::OK)
133 }
134
135 fn with_fail_attempts(mut fail_attempts: usize, fail_status: zx::Status) -> Self {
136 let events = Arc::new(Mutex::new(Vec::new()));
137 let events_task = Arc::clone(&events);
138
139 let (engine, mut engine_stream) = create_proxy_and_stream::<EngineMarker>();
140
141 fasync::Task::local(async move {
142 while let Some(req) = engine_stream.try_next().await.unwrap() {
143 match req {
144 EngineRequest::StartEditTransaction { transaction, control_handle: _ } => {
145 let mut tx_stream = transaction.into_stream();
146
147 while let Some(req) = tx_stream.try_next().await.unwrap() {
148 match req {
149 EditTransactionRequest::ResetAll { control_handle: _ } => {
150 events_task.lock().unwrap().push(Event::ResetAll);
151 }
152 EditTransactionRequest::ListDynamic {
153 iterator,
154 control_handle: _,
155 } => {
156 events_task.lock().unwrap().push(Event::ListDynamic);
157 let mut stream = iterator.into_stream();
158
159 let mut rules = vec![
160 rule!("fuchsia.com" => "example.com", "/" => "/"),
161 rule!("fuchsia.com" => "mycorp.com", "/" => "/"),
162 ]
163 .into_iter();
164
165 while let Some(req) = stream.try_next().await.unwrap() {
166 let RuleIteratorRequest::Next { responder } = req;
167 events_task.lock().unwrap().push(Event::IteratorNext);
168
169 if let Some(rule) = rules.next() {
170 responder.send(&[rule.into()]).unwrap();
171 } else {
172 responder.send(&[]).unwrap();
173 }
174 }
175 }
176 EditTransactionRequest::Add { rule, responder } => {
177 events_task
178 .lock()
179 .unwrap()
180 .push(Event::Add(rule.try_into().unwrap()));
181 responder.send(Ok(())).unwrap();
182 }
183 EditTransactionRequest::Commit { responder } => {
184 if fail_attempts > 0 {
185 fail_attempts -= 1;
186 events_task.lock().unwrap().push(Event::CommitFailed);
187 responder.send(Err(fail_status.into_raw())).unwrap();
188 } else {
189 events_task.lock().unwrap().push(Event::Commit);
190 responder.send(Ok(())).unwrap();
191 }
192 }
193 }
194 }
195 }
196 _ => {
197 panic!("unexpected reqest: {:?}", req);
198 }
199 }
200 }
201 })
202 .detach();
203
204 Self { engine, events }
205 }
206
207 fn take_events(&self) -> Vec<Event> {
208 self.events.lock().unwrap().drain(..).collect()
209 }
210 }
211
212 #[fasync::run_singlethreaded(test)]
213 async fn test_do_transaction_empty_always_commits() {
214 let engine = Engine::new();
215
216 do_transaction(&engine.engine, |transaction| async { Ok(transaction) }).await.unwrap();
217
218 assert_eq!(engine.take_events(), vec![Event::Commit]);
219 }
220
221 #[fasync::run_singlethreaded(test)]
222 async fn test_do_transaction_reset_all() {
223 let engine = Engine::new();
224
225 do_transaction(&engine.engine, |transaction| async {
226 transaction.reset_all()?;
227 Ok(transaction)
228 })
229 .await
230 .unwrap();
231
232 assert_eq!(engine.take_events(), vec![Event::ResetAll, Event::Commit]);
233 }
234
235 #[fasync::run_singlethreaded(test)]
236 async fn test_do_transaction_list_dynamic() {
237 let engine = Engine::new();
238
239 do_transaction(&engine.engine, |transaction| async {
240 let rules = transaction.list_dynamic().await?;
241 assert_eq!(
242 rules,
243 vec![
244 rule!("fuchsia.com" => "example.com", "/" => "/"),
245 rule!("fuchsia.com" => "mycorp.com", "/" => "/"),
246 ]
247 );
248 Ok(transaction)
249 })
250 .await
251 .unwrap();
252
253 assert_eq!(
254 engine.take_events(),
255 vec![
257 Event::ListDynamic,
258 Event::IteratorNext,
259 Event::IteratorNext,
260 Event::IteratorNext,
261 Event::Commit
262 ]
263 );
264 }
265
266 #[fasync::run_singlethreaded(test)]
267 async fn test_do_transaction_add() {
268 let engine = Engine::new();
269
270 let attempts = Arc::new(AtomicUsize::new(0));
271 do_transaction(&engine.engine, |transaction| async {
272 attempts.fetch_add(1, Ordering::SeqCst);
273 transaction.add(rule!("foo.com" => "bar.com", "/" => "/")).await?;
274 transaction.add(rule!("baz.com" => "boo.com", "/" => "/")).await?;
275 Ok(transaction)
276 })
277 .await
278 .unwrap();
279
280 assert_eq!(attempts.load(Ordering::SeqCst), 1);
281 assert_eq!(
282 engine.take_events(),
283 vec![
284 Event::Add(rule!("foo.com" => "bar.com", "/" => "/")),
285 Event::Add(rule!("baz.com" => "boo.com", "/" => "/")),
286 Event::Commit,
287 ],
288 );
289 }
290
291 #[fasync::run_singlethreaded(test)]
292 async fn test_do_transaction_closure_error_does_not_commit() {
293 let engine = Engine::new();
294
295 let attempts = Arc::new(AtomicUsize::new(0));
296 let err = do_transaction(&engine.engine, |_transaction| async {
297 attempts.fetch_add(1, Ordering::SeqCst);
298 Err(EditTransactionError::AddError(zx::Status::INTERNAL))
299 })
300 .await
301 .unwrap_err();
302
303 assert_eq!(attempts.load(Ordering::SeqCst), 1);
304 assert_matches!(err, EditTransactionError::AddError(zx::Status::INTERNAL));
305 assert_eq!(engine.take_events(), vec![]);
306 }
307
308 #[fasync::run_singlethreaded(test)]
309 async fn test_do_transaction_retries_commit_errors() {
310 let engine = Engine::with_fail_attempts(5, zx::Status::UNAVAILABLE);
311
312 let attempts = Arc::new(AtomicUsize::new(0));
313 do_transaction(&engine.engine, |transaction| async {
314 attempts.fetch_add(1, Ordering::SeqCst);
315 Ok(transaction)
316 })
317 .await
318 .unwrap();
319
320 assert_eq!(attempts.load(Ordering::SeqCst), 6);
321 assert_eq!(
322 engine.take_events(),
323 vec![
324 Event::CommitFailed,
325 Event::CommitFailed,
326 Event::CommitFailed,
327 Event::CommitFailed,
328 Event::CommitFailed,
329 Event::Commit,
330 ]
331 );
332 }
333
334 #[fasync::run_singlethreaded(test)]
335 async fn test_do_transaction_eventually_gives_up() {
336 let engine = Engine::with_fail_attempts(RETRY_ATTEMPTS + 1, zx::Status::UNAVAILABLE);
337
338 let attempts = Arc::new(AtomicUsize::new(0));
339 let err = do_transaction(&engine.engine, |transaction| async {
340 attempts.fetch_add(1, Ordering::SeqCst);
341 Ok(transaction)
342 })
343 .await
344 .unwrap_err();
345
346 assert_eq!(attempts.load(Ordering::SeqCst), RETRY_ATTEMPTS);
347 assert_matches!(err, EditTransactionError::CommitError(zx::Status::UNAVAILABLE));
348 assert_eq!(
349 engine.take_events(),
350 (0..RETRY_ATTEMPTS).map(|_| Event::CommitFailed).collect::<Vec<_>>(),
351 );
352 }
353
354 #[fasync::run_singlethreaded(test)]
355 async fn test_do_transaction_does_not_retry_other_errors() {
356 let engine = Engine::with_fail_attempts(5, zx::Status::INTERNAL);
357
358 let attempts = Arc::new(AtomicUsize::new(0));
359 let err = do_transaction(&engine.engine, |transaction| async {
360 attempts.fetch_add(1, Ordering::SeqCst);
361 Ok(transaction)
362 })
363 .await
364 .unwrap_err();
365
366 assert_eq!(attempts.load(Ordering::SeqCst), 1);
367 assert_matches!(err, EditTransactionError::CommitError(zx::Status::INTERNAL));
368 assert_eq!(engine.take_events(), vec![Event::CommitFailed,]);
369 }
370}