fidl_fuchsia_pkg_rewrite_ext/
transaction.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::EditTransactionError;
6use crate::rule::Rule;
7use fidl_fuchsia_pkg_rewrite::{EditTransactionProxy, EngineProxy};
8use std::future::Future;
9use zx_status as zx;
10
11const RETRY_ATTEMPTS: usize = 100;
12
13/// A helper for managing the editing of rewrite rules.
14pub struct EditTransaction {
15    transaction: EditTransactionProxy,
16}
17
18impl EditTransaction {
19    /// Removes all dynamically configured rewrite rules, leaving only any
20    /// statically configured rules.
21    pub fn reset_all(&self) -> Result<(), EditTransactionError> {
22        self.transaction.reset_all().map_err(EditTransactionError::Fidl)
23    }
24
25    /// Returns a vector of all dynamic (editable) rewrite rules. The
26    /// vector will reflect any changes made to the rewrite rules so far in
27    /// this transaction.
28    pub async fn list_dynamic(&self) -> Result<Vec<Rule>, EditTransactionError> {
29        let (iter, iter_server_end) = fidl::endpoints::create_proxy();
30        self.transaction.list_dynamic(iter_server_end)?;
31
32        let mut rules = Vec::new();
33        loop {
34            let chunk = iter.next().await?;
35            if chunk.is_empty() {
36                break;
37            }
38
39            for rule in chunk {
40                rules.push(Rule::try_from(rule)?);
41            }
42        }
43
44        Ok(rules)
45    }
46
47    /// Adds a rewrite rule with highest priority. If `rule` already exists, this
48    /// API will prioritize it over other rules.
49    pub async fn add(&self, rule: Rule) -> Result<(), EditTransactionError> {
50        self.transaction
51            .add(&rule.into())
52            .await?
53            .map_err(|err| EditTransactionError::AddError(zx::Status::from_raw(err)))
54    }
55}
56
57/// Perform a rewrite rule edit transaction, retrying as necessary if another edit transaction runs
58/// concurrently.
59///
60/// The given callback `cb` should perform the needed edits to the state of the rewrite rules but
61/// not attempt to `commit()` the transaction. `do_transaction` will internally attempt to commit
62/// the transaction and trigger a retry if necessary.
63pub async fn do_transaction<T, R>(engine: &EngineProxy, cb: T) -> Result<(), EditTransactionError>
64where
65    T: Fn(EditTransaction) -> R,
66    R: Future<Output = Result<EditTransaction, EditTransactionError>>,
67{
68    // Make a reasonable effort to retry the edit after a concurrent edit, but don't retry forever.
69    for _ in 0..RETRY_ATTEMPTS {
70        let (transaction, transaction_server_end) = fidl::endpoints::create_proxy();
71
72        let () = engine
73            .start_edit_transaction(transaction_server_end)
74            .map_err(EditTransactionError::Fidl)?;
75
76        let transaction = cb(EditTransaction { transaction }).await?;
77
78        let response =
79            transaction.transaction.commit().await.map_err(EditTransactionError::Fidl)?;
80
81        // Retry edit transaction on concurrent edit
82        return match response.map_err(zx::Status::from_raw) {
83            Ok(()) => Ok(()),
84            Err(zx::Status::UNAVAILABLE) => {
85                continue;
86            }
87            Err(status) => Err(EditTransactionError::CommitError(status)),
88        };
89    }
90
91    Err(EditTransactionError::CommitError(zx::Status::UNAVAILABLE))
92}
93
94#[cfg(test)]
95mod tests {
96    use super::*;
97    use assert_matches::assert_matches;
98    use fidl::endpoints::create_proxy_and_stream;
99    use fidl_fuchsia_pkg_rewrite::{
100        EditTransactionRequest, EngineMarker, EngineRequest, RuleIteratorRequest,
101    };
102    use fuchsia_async as fasync;
103    use futures::TryStreamExt;
104    use std::sync::atomic::{AtomicUsize, Ordering};
105    use std::sync::{Arc, Mutex};
106
107    #[derive(Debug, PartialEq)]
108    enum Event {
109        ResetAll,
110        ListDynamic,
111        IteratorNext,
112        Add(Rule),
113        CommitFailed,
114        Commit,
115    }
116
117    struct Engine {
118        engine: EngineProxy,
119        events: Arc<Mutex<Vec<Event>>>,
120    }
121
122    macro_rules! rule {
123        ($host_match:expr => $host_replacement:expr,
124         $path_prefix_match:expr => $path_prefix_replacement:expr) => {
125            Rule::new($host_match, $host_replacement, $path_prefix_match, $path_prefix_replacement)
126                .unwrap()
127        };
128    }
129
130    impl Engine {
131        fn new() -> Self {
132            Self::with_fail_attempts(0, zx::Status::OK)
133        }
134
135        fn with_fail_attempts(mut fail_attempts: usize, fail_status: zx::Status) -> Self {
136            let events = Arc::new(Mutex::new(Vec::new()));
137            let events_task = Arc::clone(&events);
138
139            let (engine, mut engine_stream) = create_proxy_and_stream::<EngineMarker>();
140
141            fasync::Task::local(async move {
142                while let Some(req) = engine_stream.try_next().await.unwrap() {
143                    match req {
144                        EngineRequest::StartEditTransaction { transaction, control_handle: _ } => {
145                            let mut tx_stream = transaction.into_stream();
146
147                            while let Some(req) = tx_stream.try_next().await.unwrap() {
148                                match req {
149                                    EditTransactionRequest::ResetAll { control_handle: _ } => {
150                                        events_task.lock().unwrap().push(Event::ResetAll);
151                                    }
152                                    EditTransactionRequest::ListDynamic {
153                                        iterator,
154                                        control_handle: _,
155                                    } => {
156                                        events_task.lock().unwrap().push(Event::ListDynamic);
157                                        let mut stream = iterator.into_stream();
158
159                                        let mut rules = vec![
160                                            rule!("fuchsia.com" => "example.com", "/" => "/"),
161                                            rule!("fuchsia.com" => "mycorp.com", "/" => "/"),
162                                        ]
163                                        .into_iter();
164
165                                        while let Some(req) = stream.try_next().await.unwrap() {
166                                            let RuleIteratorRequest::Next { responder } = req;
167                                            events_task.lock().unwrap().push(Event::IteratorNext);
168
169                                            if let Some(rule) = rules.next() {
170                                                responder.send(&[rule.into()]).unwrap();
171                                            } else {
172                                                responder.send(&[]).unwrap();
173                                            }
174                                        }
175                                    }
176                                    EditTransactionRequest::Add { rule, responder } => {
177                                        events_task
178                                            .lock()
179                                            .unwrap()
180                                            .push(Event::Add(rule.try_into().unwrap()));
181                                        responder.send(Ok(())).unwrap();
182                                    }
183                                    EditTransactionRequest::Commit { responder } => {
184                                        if fail_attempts > 0 {
185                                            fail_attempts -= 1;
186                                            events_task.lock().unwrap().push(Event::CommitFailed);
187                                            responder.send(Err(fail_status.into_raw())).unwrap();
188                                        } else {
189                                            events_task.lock().unwrap().push(Event::Commit);
190                                            responder.send(Ok(())).unwrap();
191                                        }
192                                    }
193                                }
194                            }
195                        }
196                        _ => {
197                            panic!("unexpected reqest: {:?}", req);
198                        }
199                    }
200                }
201            })
202            .detach();
203
204            Self { engine, events }
205        }
206
207        fn take_events(&self) -> Vec<Event> {
208            self.events.lock().unwrap().drain(..).collect()
209        }
210    }
211
212    #[fasync::run_singlethreaded(test)]
213    async fn test_do_transaction_empty_always_commits() {
214        let engine = Engine::new();
215
216        do_transaction(&engine.engine, |transaction| async { Ok(transaction) }).await.unwrap();
217
218        assert_eq!(engine.take_events(), vec![Event::Commit]);
219    }
220
221    #[fasync::run_singlethreaded(test)]
222    async fn test_do_transaction_reset_all() {
223        let engine = Engine::new();
224
225        do_transaction(&engine.engine, |transaction| async {
226            transaction.reset_all()?;
227            Ok(transaction)
228        })
229        .await
230        .unwrap();
231
232        assert_eq!(engine.take_events(), vec![Event::ResetAll, Event::Commit]);
233    }
234
235    #[fasync::run_singlethreaded(test)]
236    async fn test_do_transaction_list_dynamic() {
237        let engine = Engine::new();
238
239        do_transaction(&engine.engine, |transaction| async {
240            let rules = transaction.list_dynamic().await?;
241            assert_eq!(
242                rules,
243                vec![
244                    rule!("fuchsia.com" => "example.com", "/" => "/"),
245                    rule!("fuchsia.com" => "mycorp.com", "/" => "/"),
246                ]
247            );
248            Ok(transaction)
249        })
250        .await
251        .unwrap();
252
253        assert_eq!(
254            engine.take_events(),
255            // We should get three iterators. The first two get the rules, the last gets nothing.
256            vec![
257                Event::ListDynamic,
258                Event::IteratorNext,
259                Event::IteratorNext,
260                Event::IteratorNext,
261                Event::Commit
262            ]
263        );
264    }
265
266    #[fasync::run_singlethreaded(test)]
267    async fn test_do_transaction_add() {
268        let engine = Engine::new();
269
270        let attempts = Arc::new(AtomicUsize::new(0));
271        do_transaction(&engine.engine, |transaction| async {
272            attempts.fetch_add(1, Ordering::SeqCst);
273            transaction.add(rule!("foo.com" => "bar.com", "/" => "/")).await?;
274            transaction.add(rule!("baz.com" => "boo.com", "/" => "/")).await?;
275            Ok(transaction)
276        })
277        .await
278        .unwrap();
279
280        assert_eq!(attempts.load(Ordering::SeqCst), 1);
281        assert_eq!(
282            engine.take_events(),
283            vec![
284                Event::Add(rule!("foo.com" => "bar.com", "/" => "/")),
285                Event::Add(rule!("baz.com" => "boo.com", "/" => "/")),
286                Event::Commit,
287            ],
288        );
289    }
290
291    #[fasync::run_singlethreaded(test)]
292    async fn test_do_transaction_closure_error_does_not_commit() {
293        let engine = Engine::new();
294
295        let attempts = Arc::new(AtomicUsize::new(0));
296        let err = do_transaction(&engine.engine, |_transaction| async {
297            attempts.fetch_add(1, Ordering::SeqCst);
298            Err(EditTransactionError::AddError(zx::Status::INTERNAL))
299        })
300        .await
301        .unwrap_err();
302
303        assert_eq!(attempts.load(Ordering::SeqCst), 1);
304        assert_matches!(err, EditTransactionError::AddError(zx::Status::INTERNAL));
305        assert_eq!(engine.take_events(), vec![]);
306    }
307
308    #[fasync::run_singlethreaded(test)]
309    async fn test_do_transaction_retries_commit_errors() {
310        let engine = Engine::with_fail_attempts(5, zx::Status::UNAVAILABLE);
311
312        let attempts = Arc::new(AtomicUsize::new(0));
313        do_transaction(&engine.engine, |transaction| async {
314            attempts.fetch_add(1, Ordering::SeqCst);
315            Ok(transaction)
316        })
317        .await
318        .unwrap();
319
320        assert_eq!(attempts.load(Ordering::SeqCst), 6);
321        assert_eq!(
322            engine.take_events(),
323            vec![
324                Event::CommitFailed,
325                Event::CommitFailed,
326                Event::CommitFailed,
327                Event::CommitFailed,
328                Event::CommitFailed,
329                Event::Commit,
330            ]
331        );
332    }
333
334    #[fasync::run_singlethreaded(test)]
335    async fn test_do_transaction_eventually_gives_up() {
336        let engine = Engine::with_fail_attempts(RETRY_ATTEMPTS + 1, zx::Status::UNAVAILABLE);
337
338        let attempts = Arc::new(AtomicUsize::new(0));
339        let err = do_transaction(&engine.engine, |transaction| async {
340            attempts.fetch_add(1, Ordering::SeqCst);
341            Ok(transaction)
342        })
343        .await
344        .unwrap_err();
345
346        assert_eq!(attempts.load(Ordering::SeqCst), RETRY_ATTEMPTS);
347        assert_matches!(err, EditTransactionError::CommitError(zx::Status::UNAVAILABLE));
348        assert_eq!(
349            engine.take_events(),
350            (0..RETRY_ATTEMPTS).map(|_| Event::CommitFailed).collect::<Vec<_>>(),
351        );
352    }
353
354    #[fasync::run_singlethreaded(test)]
355    async fn test_do_transaction_does_not_retry_other_errors() {
356        let engine = Engine::with_fail_attempts(5, zx::Status::INTERNAL);
357
358        let attempts = Arc::new(AtomicUsize::new(0));
359        let err = do_transaction(&engine.engine, |transaction| async {
360            attempts.fetch_add(1, Ordering::SeqCst);
361            Ok(transaction)
362        })
363        .await
364        .unwrap_err();
365
366        assert_eq!(attempts.load(Ordering::SeqCst), 1);
367        assert_matches!(err, EditTransactionError::CommitError(zx::Status::INTERNAL));
368        assert_eq!(engine.take_events(), vec![Event::CommitFailed,]);
369    }
370}