Skip to main content

fidl_fuchsia_pkg_rewrite_ext/
transaction.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::errors::EditTransactionError;
6use crate::rule::Rule;
7use flex_client::ProxyHasDomain;
8use flex_fuchsia_pkg_rewrite::{EditTransactionProxy, EngineProxy};
9use std::future::Future;
10use {flex_fuchsia_pkg_rewrite as rewrite, zx_status as zx};
11
12const RETRY_ATTEMPTS: usize = 100;
13
14/// A helper for managing the editing of rewrite rules.
15pub struct EditTransaction {
16    transaction: EditTransactionProxy,
17}
18
19impl EditTransaction {
20    /// Removes all dynamically configured rewrite rules, leaving only any
21    /// statically configured rules.
22    pub fn reset_all(&self) -> Result<(), EditTransactionError> {
23        self.transaction.reset_all().map_err(EditTransactionError::from)
24    }
25
26    /// Returns a vector of all dynamic (editable) rewrite rules. The
27    /// vector will reflect any changes made to the rewrite rules so far in
28    /// this transaction.
29    pub async fn list_dynamic(&self) -> Result<Vec<Rule>, EditTransactionError> {
30        let (iter, iter_server_end) =
31            self.transaction.domain().create_proxy::<rewrite::RuleIteratorMarker>();
32        self.transaction.list_dynamic(iter_server_end)?;
33
34        let mut rules = Vec::new();
35        loop {
36            let chunk = iter.next().await?;
37            if chunk.is_empty() {
38                break;
39            }
40
41            for rule in chunk {
42                rules.push(Rule::try_from(rule)?);
43            }
44        }
45
46        Ok(rules)
47    }
48
49    /// Adds a rewrite rule with highest priority. If `rule` already exists, this
50    /// API will prioritize it over other rules.
51    pub async fn add(&self, rule: Rule) -> Result<(), EditTransactionError> {
52        self.transaction
53            .add(&rule.into())
54            .await?
55            .map_err(|err| EditTransactionError::AddError(zx::Status::from_raw(err)))
56    }
57}
58
59/// Perform a rewrite rule edit transaction, retrying as necessary if another edit transaction runs
60/// concurrently.
61///
62/// The given callback `cb` should perform the needed edits to the state of the rewrite rules but
63/// not attempt to `commit()` the transaction. `do_transaction` will internally attempt to commit
64/// the transaction and trigger a retry if necessary.
65pub async fn do_transaction<T, R>(engine: &EngineProxy, cb: T) -> Result<(), EditTransactionError>
66where
67    T: Fn(EditTransaction) -> R,
68    R: Future<Output = Result<EditTransaction, EditTransactionError>>,
69{
70    // Make a reasonable effort to retry the edit after a concurrent edit, but don't retry forever.
71    for _ in 0..RETRY_ATTEMPTS {
72        let (transaction, transaction_server_end) =
73            engine.domain().create_proxy::<rewrite::EditTransactionMarker>();
74
75        let () = engine
76            .start_edit_transaction(transaction_server_end)
77            .map_err(EditTransactionError::from)?;
78
79        let transaction = cb(EditTransaction { transaction }).await?;
80
81        let response =
82            transaction.transaction.commit().await.map_err(EditTransactionError::from)?;
83
84        // Retry edit transaction on concurrent edit
85        return match response.map_err(zx::Status::from_raw) {
86            Ok(()) => Ok(()),
87            Err(zx::Status::UNAVAILABLE) => {
88                continue;
89            }
90            Err(status) => Err(EditTransactionError::CommitError(status)),
91        };
92    }
93
94    Err(EditTransactionError::CommitError(zx::Status::UNAVAILABLE))
95}
96
97#[cfg(test)]
98mod tests {
99    use super::*;
100    use assert_matches::assert_matches;
101    use flex_fuchsia_pkg_rewrite::{
102        EditTransactionRequest, EngineMarker, EngineRequest, RuleIteratorRequest,
103    };
104    use fuchsia_async as fasync;
105    use futures::TryStreamExt;
106    use std::sync::atomic::{AtomicUsize, Ordering};
107    use std::sync::{Arc, Mutex};
108
109    #[derive(Debug, PartialEq)]
110    enum Event {
111        ResetAll,
112        ListDynamic,
113        IteratorNext,
114        Add(Rule),
115        CommitFailed,
116        Commit,
117    }
118
119    struct Engine {
120        engine: EngineProxy,
121        events: Arc<Mutex<Vec<Event>>>,
122        #[cfg(feature = "fdomain")]
123        _client: Arc<flex_client::Client>,
124    }
125
126    macro_rules! rule {
127        ($host_match:expr => $host_replacement:expr,
128         $path_prefix_match:expr => $path_prefix_replacement:expr) => {
129            Rule::new($host_match, $host_replacement, $path_prefix_match, $path_prefix_replacement)
130                .unwrap()
131        };
132    }
133
134    impl Engine {
135        fn new() -> Self {
136            Self::with_fail_attempts(0, zx::Status::OK)
137        }
138
139        fn with_fail_attempts(mut fail_attempts: usize, fail_status: zx::Status) -> Self {
140            #[cfg(feature = "fdomain")]
141            let client = fdomain_local::local_client_empty();
142            #[cfg(not(feature = "fdomain"))]
143            let client = flex_client::fidl::ZirconClient;
144            let events = Arc::new(Mutex::new(Vec::new()));
145            let events_task = Arc::clone(&events);
146
147            let (engine, mut engine_stream) = client.create_proxy_and_stream::<EngineMarker>();
148
149            fasync::Task::local(async move {
150                while let Some(req) = engine_stream.try_next().await.unwrap() {
151                    match req {
152                        EngineRequest::StartEditTransaction { transaction, control_handle: _ } => {
153                            let mut tx_stream = transaction.into_stream();
154
155                            while let Some(req) = tx_stream.try_next().await.unwrap() {
156                                match req {
157                                    EditTransactionRequest::ResetAll { control_handle: _ } => {
158                                        events_task.lock().unwrap().push(Event::ResetAll);
159                                    }
160                                    EditTransactionRequest::ListDynamic {
161                                        iterator,
162                                        control_handle: _,
163                                    } => {
164                                        events_task.lock().unwrap().push(Event::ListDynamic);
165                                        let mut stream = iterator.into_stream();
166
167                                        let mut rules = vec![
168                                            rule!("fuchsia.com" => "example.com", "/" => "/"),
169                                            rule!("fuchsia.com" => "mycorp.com", "/" => "/"),
170                                        ]
171                                        .into_iter();
172
173                                        while let Some(req) = stream.try_next().await.unwrap() {
174                                            let RuleIteratorRequest::Next { responder } = req;
175                                            events_task.lock().unwrap().push(Event::IteratorNext);
176
177                                            if let Some(rule) = rules.next() {
178                                                responder.send(&[rule.into()]).unwrap();
179                                            } else {
180                                                responder.send(&[]).unwrap();
181                                            }
182                                        }
183                                    }
184                                    EditTransactionRequest::Add { rule, responder } => {
185                                        events_task
186                                            .lock()
187                                            .unwrap()
188                                            .push(Event::Add(rule.try_into().unwrap()));
189                                        responder.send(Ok(())).unwrap();
190                                    }
191                                    EditTransactionRequest::Commit { responder } => {
192                                        if fail_attempts > 0 {
193                                            fail_attempts -= 1;
194                                            events_task.lock().unwrap().push(Event::CommitFailed);
195                                            responder.send(Err(fail_status.into_raw())).unwrap();
196                                        } else {
197                                            events_task.lock().unwrap().push(Event::Commit);
198                                            responder.send(Ok(())).unwrap();
199                                        }
200                                    }
201                                }
202                            }
203                        }
204                        _ => {
205                            panic!("unexpected reqest: {:?}", req);
206                        }
207                    }
208                }
209            })
210            .detach();
211
212            Self {
213                engine,
214                events,
215                #[cfg(feature = "fdomain")]
216                _client: client,
217            }
218        }
219
220        fn take_events(&self) -> Vec<Event> {
221            self.events.lock().unwrap().drain(..).collect()
222        }
223    }
224
225    #[fasync::run_singlethreaded(test)]
226    async fn test_do_transaction_empty_always_commits() {
227        let engine = Engine::new();
228
229        do_transaction(&engine.engine, |transaction| async { Ok(transaction) }).await.unwrap();
230
231        assert_eq!(engine.take_events(), vec![Event::Commit]);
232    }
233
234    #[fasync::run_singlethreaded(test)]
235    async fn test_do_transaction_reset_all() {
236        let engine = Engine::new();
237
238        do_transaction(&engine.engine, |transaction| async {
239            transaction.reset_all()?;
240            Ok(transaction)
241        })
242        .await
243        .unwrap();
244
245        assert_eq!(engine.take_events(), vec![Event::ResetAll, Event::Commit]);
246    }
247
248    #[fasync::run_singlethreaded(test)]
249    async fn test_do_transaction_list_dynamic() {
250        let engine = Engine::new();
251
252        do_transaction(&engine.engine, |transaction| async {
253            let rules = transaction.list_dynamic().await?;
254            assert_eq!(
255                rules,
256                vec![
257                    rule!("fuchsia.com" => "example.com", "/" => "/"),
258                    rule!("fuchsia.com" => "mycorp.com", "/" => "/"),
259                ]
260            );
261            Ok(transaction)
262        })
263        .await
264        .unwrap();
265
266        assert_eq!(
267            engine.take_events(),
268            // We should get three iterators. The first two get the rules, the last gets nothing.
269            vec![
270                Event::ListDynamic,
271                Event::IteratorNext,
272                Event::IteratorNext,
273                Event::IteratorNext,
274                Event::Commit
275            ]
276        );
277    }
278
279    #[fasync::run_singlethreaded(test)]
280    async fn test_do_transaction_add() {
281        let engine = Engine::new();
282
283        let attempts = Arc::new(AtomicUsize::new(0));
284        do_transaction(&engine.engine, |transaction| async {
285            attempts.fetch_add(1, Ordering::SeqCst);
286            transaction.add(rule!("foo.com" => "bar.com", "/" => "/")).await?;
287            transaction.add(rule!("baz.com" => "boo.com", "/" => "/")).await?;
288            Ok(transaction)
289        })
290        .await
291        .unwrap();
292
293        assert_eq!(attempts.load(Ordering::SeqCst), 1);
294        assert_eq!(
295            engine.take_events(),
296            vec![
297                Event::Add(rule!("foo.com" => "bar.com", "/" => "/")),
298                Event::Add(rule!("baz.com" => "boo.com", "/" => "/")),
299                Event::Commit,
300            ],
301        );
302    }
303
304    #[fasync::run_singlethreaded(test)]
305    async fn test_do_transaction_closure_error_does_not_commit() {
306        let engine = Engine::new();
307
308        let attempts = Arc::new(AtomicUsize::new(0));
309        let err = do_transaction(&engine.engine, |_transaction| async {
310            attempts.fetch_add(1, Ordering::SeqCst);
311            Err(EditTransactionError::AddError(zx::Status::INTERNAL))
312        })
313        .await
314        .unwrap_err();
315
316        assert_eq!(attempts.load(Ordering::SeqCst), 1);
317        assert_matches!(err, EditTransactionError::AddError(zx::Status::INTERNAL));
318        assert_eq!(engine.take_events(), vec![]);
319    }
320
321    #[fasync::run_singlethreaded(test)]
322    async fn test_do_transaction_retries_commit_errors() {
323        let engine = Engine::with_fail_attempts(5, zx::Status::UNAVAILABLE);
324
325        let attempts = Arc::new(AtomicUsize::new(0));
326        do_transaction(&engine.engine, |transaction| async {
327            attempts.fetch_add(1, Ordering::SeqCst);
328            Ok(transaction)
329        })
330        .await
331        .unwrap();
332
333        assert_eq!(attempts.load(Ordering::SeqCst), 6);
334        assert_eq!(
335            engine.take_events(),
336            vec![
337                Event::CommitFailed,
338                Event::CommitFailed,
339                Event::CommitFailed,
340                Event::CommitFailed,
341                Event::CommitFailed,
342                Event::Commit,
343            ]
344        );
345    }
346
347    #[fasync::run_singlethreaded(test)]
348    async fn test_do_transaction_eventually_gives_up() {
349        let engine = Engine::with_fail_attempts(RETRY_ATTEMPTS + 1, zx::Status::UNAVAILABLE);
350
351        let attempts = Arc::new(AtomicUsize::new(0));
352        let err = do_transaction(&engine.engine, |transaction| async {
353            attempts.fetch_add(1, Ordering::SeqCst);
354            Ok(transaction)
355        })
356        .await
357        .unwrap_err();
358
359        assert_eq!(attempts.load(Ordering::SeqCst), RETRY_ATTEMPTS);
360        assert_matches!(err, EditTransactionError::CommitError(zx::Status::UNAVAILABLE));
361        assert_eq!(
362            engine.take_events(),
363            (0..RETRY_ATTEMPTS).map(|_| Event::CommitFailed).collect::<Vec<_>>(),
364        );
365    }
366
367    #[fasync::run_singlethreaded(test)]
368    async fn test_do_transaction_does_not_retry_other_errors() {
369        let engine = Engine::with_fail_attempts(5, zx::Status::INTERNAL);
370
371        let attempts = Arc::new(AtomicUsize::new(0));
372        let err = do_transaction(&engine.engine, |transaction| async {
373            attempts.fetch_add(1, Ordering::SeqCst);
374            Ok(transaction)
375        })
376        .await
377        .unwrap_err();
378
379        assert_eq!(attempts.load(Ordering::SeqCst), 1);
380        assert_matches!(err, EditTransactionError::CommitError(zx::Status::INTERNAL));
381        assert_eq!(engine.take_events(), vec![Event::CommitFailed,]);
382    }
383}