fidl_fuchsia_pkg_rewrite_ext/
transaction.rs1use crate::errors::EditTransactionError;
6use crate::rule::Rule;
7use flex_client::ProxyHasDomain;
8use flex_fuchsia_pkg_rewrite::{EditTransactionProxy, EngineProxy};
9use std::future::Future;
10use {flex_fuchsia_pkg_rewrite as rewrite, zx_status as zx};
11
12const RETRY_ATTEMPTS: usize = 100;
13
14pub struct EditTransaction {
16 transaction: EditTransactionProxy,
17}
18
19impl EditTransaction {
20 pub fn reset_all(&self) -> Result<(), EditTransactionError> {
23 self.transaction.reset_all().map_err(EditTransactionError::from)
24 }
25
26 pub async fn list_dynamic(&self) -> Result<Vec<Rule>, EditTransactionError> {
30 let (iter, iter_server_end) =
31 self.transaction.domain().create_proxy::<rewrite::RuleIteratorMarker>();
32 self.transaction.list_dynamic(iter_server_end)?;
33
34 let mut rules = Vec::new();
35 loop {
36 let chunk = iter.next().await?;
37 if chunk.is_empty() {
38 break;
39 }
40
41 for rule in chunk {
42 rules.push(Rule::try_from(rule)?);
43 }
44 }
45
46 Ok(rules)
47 }
48
49 pub async fn add(&self, rule: Rule) -> Result<(), EditTransactionError> {
52 self.transaction
53 .add(&rule.into())
54 .await?
55 .map_err(|err| EditTransactionError::AddError(zx::Status::from_raw(err)))
56 }
57}
58
59pub async fn do_transaction<T, R>(engine: &EngineProxy, cb: T) -> Result<(), EditTransactionError>
66where
67 T: Fn(EditTransaction) -> R,
68 R: Future<Output = Result<EditTransaction, EditTransactionError>>,
69{
70 for _ in 0..RETRY_ATTEMPTS {
72 let (transaction, transaction_server_end) =
73 engine.domain().create_proxy::<rewrite::EditTransactionMarker>();
74
75 let () = engine
76 .start_edit_transaction(transaction_server_end)
77 .map_err(EditTransactionError::from)?;
78
79 let transaction = cb(EditTransaction { transaction }).await?;
80
81 let response =
82 transaction.transaction.commit().await.map_err(EditTransactionError::from)?;
83
84 return match response.map_err(zx::Status::from_raw) {
86 Ok(()) => Ok(()),
87 Err(zx::Status::UNAVAILABLE) => {
88 continue;
89 }
90 Err(status) => Err(EditTransactionError::CommitError(status)),
91 };
92 }
93
94 Err(EditTransactionError::CommitError(zx::Status::UNAVAILABLE))
95}
96
97#[cfg(test)]
98mod tests {
99 use super::*;
100 use assert_matches::assert_matches;
101 use flex_fuchsia_pkg_rewrite::{
102 EditTransactionRequest, EngineMarker, EngineRequest, RuleIteratorRequest,
103 };
104 use fuchsia_async as fasync;
105 use futures::TryStreamExt;
106 use std::sync::atomic::{AtomicUsize, Ordering};
107 use std::sync::{Arc, Mutex};
108
109 #[derive(Debug, PartialEq)]
110 enum Event {
111 ResetAll,
112 ListDynamic,
113 IteratorNext,
114 Add(Rule),
115 CommitFailed,
116 Commit,
117 }
118
119 struct Engine {
120 engine: EngineProxy,
121 events: Arc<Mutex<Vec<Event>>>,
122 #[cfg(feature = "fdomain")]
123 _client: Arc<flex_client::Client>,
124 }
125
126 macro_rules! rule {
127 ($host_match:expr => $host_replacement:expr,
128 $path_prefix_match:expr => $path_prefix_replacement:expr) => {
129 Rule::new($host_match, $host_replacement, $path_prefix_match, $path_prefix_replacement)
130 .unwrap()
131 };
132 }
133
134 impl Engine {
135 fn new() -> Self {
136 Self::with_fail_attempts(0, zx::Status::OK)
137 }
138
139 fn with_fail_attempts(mut fail_attempts: usize, fail_status: zx::Status) -> Self {
140 #[cfg(feature = "fdomain")]
141 let client = fdomain_local::local_client_empty();
142 #[cfg(not(feature = "fdomain"))]
143 let client = flex_client::fidl::ZirconClient;
144 let events = Arc::new(Mutex::new(Vec::new()));
145 let events_task = Arc::clone(&events);
146
147 let (engine, mut engine_stream) = client.create_proxy_and_stream::<EngineMarker>();
148
149 fasync::Task::local(async move {
150 while let Some(req) = engine_stream.try_next().await.unwrap() {
151 match req {
152 EngineRequest::StartEditTransaction { transaction, control_handle: _ } => {
153 let mut tx_stream = transaction.into_stream();
154
155 while let Some(req) = tx_stream.try_next().await.unwrap() {
156 match req {
157 EditTransactionRequest::ResetAll { control_handle: _ } => {
158 events_task.lock().unwrap().push(Event::ResetAll);
159 }
160 EditTransactionRequest::ListDynamic {
161 iterator,
162 control_handle: _,
163 } => {
164 events_task.lock().unwrap().push(Event::ListDynamic);
165 let mut stream = iterator.into_stream();
166
167 let mut rules = vec![
168 rule!("fuchsia.com" => "example.com", "/" => "/"),
169 rule!("fuchsia.com" => "mycorp.com", "/" => "/"),
170 ]
171 .into_iter();
172
173 while let Some(req) = stream.try_next().await.unwrap() {
174 let RuleIteratorRequest::Next { responder } = req;
175 events_task.lock().unwrap().push(Event::IteratorNext);
176
177 if let Some(rule) = rules.next() {
178 responder.send(&[rule.into()]).unwrap();
179 } else {
180 responder.send(&[]).unwrap();
181 }
182 }
183 }
184 EditTransactionRequest::Add { rule, responder } => {
185 events_task
186 .lock()
187 .unwrap()
188 .push(Event::Add(rule.try_into().unwrap()));
189 responder.send(Ok(())).unwrap();
190 }
191 EditTransactionRequest::Commit { responder } => {
192 if fail_attempts > 0 {
193 fail_attempts -= 1;
194 events_task.lock().unwrap().push(Event::CommitFailed);
195 responder.send(Err(fail_status.into_raw())).unwrap();
196 } else {
197 events_task.lock().unwrap().push(Event::Commit);
198 responder.send(Ok(())).unwrap();
199 }
200 }
201 }
202 }
203 }
204 _ => {
205 panic!("unexpected reqest: {:?}", req);
206 }
207 }
208 }
209 })
210 .detach();
211
212 Self {
213 engine,
214 events,
215 #[cfg(feature = "fdomain")]
216 _client: client,
217 }
218 }
219
220 fn take_events(&self) -> Vec<Event> {
221 self.events.lock().unwrap().drain(..).collect()
222 }
223 }
224
225 #[fasync::run_singlethreaded(test)]
226 async fn test_do_transaction_empty_always_commits() {
227 let engine = Engine::new();
228
229 do_transaction(&engine.engine, |transaction| async { Ok(transaction) }).await.unwrap();
230
231 assert_eq!(engine.take_events(), vec![Event::Commit]);
232 }
233
234 #[fasync::run_singlethreaded(test)]
235 async fn test_do_transaction_reset_all() {
236 let engine = Engine::new();
237
238 do_transaction(&engine.engine, |transaction| async {
239 transaction.reset_all()?;
240 Ok(transaction)
241 })
242 .await
243 .unwrap();
244
245 assert_eq!(engine.take_events(), vec![Event::ResetAll, Event::Commit]);
246 }
247
248 #[fasync::run_singlethreaded(test)]
249 async fn test_do_transaction_list_dynamic() {
250 let engine = Engine::new();
251
252 do_transaction(&engine.engine, |transaction| async {
253 let rules = transaction.list_dynamic().await?;
254 assert_eq!(
255 rules,
256 vec![
257 rule!("fuchsia.com" => "example.com", "/" => "/"),
258 rule!("fuchsia.com" => "mycorp.com", "/" => "/"),
259 ]
260 );
261 Ok(transaction)
262 })
263 .await
264 .unwrap();
265
266 assert_eq!(
267 engine.take_events(),
268 vec![
270 Event::ListDynamic,
271 Event::IteratorNext,
272 Event::IteratorNext,
273 Event::IteratorNext,
274 Event::Commit
275 ]
276 );
277 }
278
279 #[fasync::run_singlethreaded(test)]
280 async fn test_do_transaction_add() {
281 let engine = Engine::new();
282
283 let attempts = Arc::new(AtomicUsize::new(0));
284 do_transaction(&engine.engine, |transaction| async {
285 attempts.fetch_add(1, Ordering::SeqCst);
286 transaction.add(rule!("foo.com" => "bar.com", "/" => "/")).await?;
287 transaction.add(rule!("baz.com" => "boo.com", "/" => "/")).await?;
288 Ok(transaction)
289 })
290 .await
291 .unwrap();
292
293 assert_eq!(attempts.load(Ordering::SeqCst), 1);
294 assert_eq!(
295 engine.take_events(),
296 vec![
297 Event::Add(rule!("foo.com" => "bar.com", "/" => "/")),
298 Event::Add(rule!("baz.com" => "boo.com", "/" => "/")),
299 Event::Commit,
300 ],
301 );
302 }
303
304 #[fasync::run_singlethreaded(test)]
305 async fn test_do_transaction_closure_error_does_not_commit() {
306 let engine = Engine::new();
307
308 let attempts = Arc::new(AtomicUsize::new(0));
309 let err = do_transaction(&engine.engine, |_transaction| async {
310 attempts.fetch_add(1, Ordering::SeqCst);
311 Err(EditTransactionError::AddError(zx::Status::INTERNAL))
312 })
313 .await
314 .unwrap_err();
315
316 assert_eq!(attempts.load(Ordering::SeqCst), 1);
317 assert_matches!(err, EditTransactionError::AddError(zx::Status::INTERNAL));
318 assert_eq!(engine.take_events(), vec![]);
319 }
320
321 #[fasync::run_singlethreaded(test)]
322 async fn test_do_transaction_retries_commit_errors() {
323 let engine = Engine::with_fail_attempts(5, zx::Status::UNAVAILABLE);
324
325 let attempts = Arc::new(AtomicUsize::new(0));
326 do_transaction(&engine.engine, |transaction| async {
327 attempts.fetch_add(1, Ordering::SeqCst);
328 Ok(transaction)
329 })
330 .await
331 .unwrap();
332
333 assert_eq!(attempts.load(Ordering::SeqCst), 6);
334 assert_eq!(
335 engine.take_events(),
336 vec![
337 Event::CommitFailed,
338 Event::CommitFailed,
339 Event::CommitFailed,
340 Event::CommitFailed,
341 Event::CommitFailed,
342 Event::Commit,
343 ]
344 );
345 }
346
347 #[fasync::run_singlethreaded(test)]
348 async fn test_do_transaction_eventually_gives_up() {
349 let engine = Engine::with_fail_attempts(RETRY_ATTEMPTS + 1, zx::Status::UNAVAILABLE);
350
351 let attempts = Arc::new(AtomicUsize::new(0));
352 let err = do_transaction(&engine.engine, |transaction| async {
353 attempts.fetch_add(1, Ordering::SeqCst);
354 Ok(transaction)
355 })
356 .await
357 .unwrap_err();
358
359 assert_eq!(attempts.load(Ordering::SeqCst), RETRY_ATTEMPTS);
360 assert_matches!(err, EditTransactionError::CommitError(zx::Status::UNAVAILABLE));
361 assert_eq!(
362 engine.take_events(),
363 (0..RETRY_ATTEMPTS).map(|_| Event::CommitFailed).collect::<Vec<_>>(),
364 );
365 }
366
367 #[fasync::run_singlethreaded(test)]
368 async fn test_do_transaction_does_not_retry_other_errors() {
369 let engine = Engine::with_fail_attempts(5, zx::Status::INTERNAL);
370
371 let attempts = Arc::new(AtomicUsize::new(0));
372 let err = do_transaction(&engine.engine, |transaction| async {
373 attempts.fetch_add(1, Ordering::SeqCst);
374 Ok(transaction)
375 })
376 .await
377 .unwrap_err();
378
379 assert_eq!(attempts.load(Ordering::SeqCst), 1);
380 assert_matches!(err, EditTransactionError::CommitError(zx::Status::INTERNAL));
381 assert_eq!(engine.take_events(), vec![Event::CommitFailed,]);
382 }
383}