wlancfg_lib/client/connection_selection/
fut_manager.rs1use crate::client::connection_selection::ConnectionSelectionRequester;
6use crate::client::types;
7use crate::mode_management::iface_manager_api::ConnectAttemptRequest;
8use anyhow::format_err;
9use futures::channel::oneshot;
10use futures::future::{FusedFuture, Future, FutureExt, LocalBoxFuture};
11use futures::stream::{FuturesUnordered, Stream};
12use log::warn;
13use std::collections::HashMap;
14use std::pin::Pin;
15use std::task::{Context, Poll, Waker};
16
17pub struct ConnectionSelectionManager {
19 #[allow(clippy::type_complexity)]
21 futures: FuturesUnordered<
22 LocalBoxFuture<
23 'static,
24 (SelectionIdentifier, Result<Option<ConnectionSelectionResponse>, anyhow::Error>),
25 >,
26 >,
27 cancellation_handles: HashMap<SelectionIdentifier, oneshot::Sender<()>>,
29 requester: ConnectionSelectionRequester,
30 waker: Option<Waker>,
31}
32
33impl ConnectionSelectionManager {
34 pub fn new(requester: ConnectionSelectionRequester) -> Self {
35 Self {
36 futures: FuturesUnordered::new(),
37 cancellation_handles: HashMap::new(),
38 requester,
39 waker: None,
40 }
41 }
42
43 pub fn spawn(
46 &mut self,
47 selection_id: SelectionIdentifier,
48 selection_future: LocalBoxFuture<
49 'static,
50 Result<ConnectionSelectionResponse, anyhow::Error>,
51 >,
52 ) {
53 if self.cancellation_handles.contains_key(&selection_id) {
54 warn!("{} already in progress. Ignoring new request.", selection_id);
55 return;
56 }
57
58 let (sender, receiver) = oneshot::channel();
59 let _ = self.cancellation_handles.insert(selection_id.clone(), sender);
60 let cancellable_future = async move {
61 futures::select! {
62 result = selection_future.fuse() => (selection_id.clone(), result.map(Some)),
63 _ = receiver.fuse() => (selection_id, Ok(None)),
64 }
65 };
66
67 self.futures.push(cancellable_future.boxed_local());
68 if let Some(waker) = &self.waker {
72 waker.wake_by_ref();
73 }
74 }
75
76 pub fn cancel(&mut self, selection_id: &SelectionIdentifier) {
78 if let Some(cancel_tx) = self.cancellation_handles.remove(selection_id) {
79 let _ = cancel_tx.send(());
80 }
81 }
82
83 pub fn cancel_all(&mut self) {
85 for selection_id in self.active_selections() {
86 self.cancel(&selection_id);
87 }
88 }
89
90 pub fn active_selections(&self) -> Vec<SelectionIdentifier> {
92 self.cancellation_handles.keys().cloned().collect()
93 }
94
95 pub fn initiate_automatic_connection_selection(&mut self) {
96 let mut requester = self.requester.clone();
97 let fut = async move {
98 requester
99 .do_connection_selection(None, types::ConnectReason::IdleInterfaceAutoconnect)
100 .await
101 .map_err(|e| format_err!("Error sending connection selection request: {}.", e))
102 .map(ConnectionSelectionResponse::Autoconnect)
103 };
104 self.spawn(SelectionIdentifier::Automatic, Box::pin(fut).boxed_local());
105 }
106
107 pub fn initiate_connection_selection_for_connect_request(
108 &mut self,
109 request: ConnectAttemptRequest,
110 ) {
111 let mut requester = self.requester.clone();
112 let network = request.network.clone();
113 let fut = async move {
114 requester
115 .do_connection_selection(Some(request.network.clone()), request.reason)
116 .await
117 .map_err(|e| format_err!("Error sending connection selection request: {}.", e))
118 .map(move |candidate| ConnectionSelectionResponse::ConnectRequest {
119 candidate,
120 request,
121 })
122 };
123 self.spawn(SelectionIdentifier::ConnectRequest(network), Box::pin(fut).boxed_local());
124 }
125
126 #[cfg(test)]
127 #[allow(clippy::type_complexity)]
128 pub fn get_futures(
129 &mut self,
130 ) -> &mut FuturesUnordered<
131 LocalBoxFuture<
132 'static,
133 (SelectionIdentifier, Result<Option<ConnectionSelectionResponse>, anyhow::Error>),
134 >,
135 > {
136 &mut self.futures
137 }
138
139 #[cfg(test)]
140 pub fn get_cancellation_handles(
141 &mut self,
142 ) -> &mut HashMap<SelectionIdentifier, oneshot::Sender<()>> {
143 &mut self.cancellation_handles
144 }
145}
146
147pub struct ConnectionSelectionFutures<'a> {
149 manager: &'a mut ConnectionSelectionManager,
150}
151impl<'a> ConnectionSelectionFutures<'a> {
152 pub fn new(manager: &'a mut ConnectionSelectionManager) -> Self {
153 Self { manager }
154 }
155}
156impl<'a> Future for ConnectionSelectionFutures<'a> {
157 type Output = (SelectionIdentifier, Result<Option<ConnectionSelectionResponse>, anyhow::Error>);
158
159 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
160 self.manager.waker = None;
162
163 match Pin::new(&mut self.manager.futures).poll_next(cx) {
164 Poll::Ready(Some((selection_id, result))) => {
165 let _ = self.manager.cancellation_handles.remove(&selection_id);
167 Poll::Ready((selection_id, result))
168 }
169 Poll::Ready(None) | Poll::Pending => {
170 self.manager.waker = Some(cx.waker().clone());
178 Poll::Pending
179 }
180 }
181 }
182}
183impl<'a> FusedFuture for ConnectionSelectionFutures<'a> {
184 fn is_terminated(&self) -> bool {
185 false
188 }
189}
190
191#[derive(Clone, PartialEq, Eq, Hash)]
193#[cfg_attr(test, derive(Debug))]
194pub enum SelectionIdentifier {
195 ConnectRequest(types::NetworkIdentifier),
196 Automatic,
197}
198impl std::fmt::Display for SelectionIdentifier {
199 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200 match self {
201 SelectionIdentifier::ConnectRequest(id) => {
202 write!(f, "Connection selection for network: {}", id)
203 }
204 SelectionIdentifier::Automatic => write!(f, "Automatic connection selection"),
205 }
206 }
207}
208
209#[cfg_attr(test, derive(Debug))]
210pub enum ConnectionSelectionResponse {
211 ConnectRequest { candidate: Option<types::ScannedCandidate>, request: ConnectAttemptRequest },
212 Autoconnect(Option<types::ScannedCandidate>),
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use crate::util::testing::generate_random_network_identifier;
219 use assert_matches::assert_matches;
220 use fuchsia_async::TestExecutor;
221 use futures::task::Poll;
222 use std::sync::Arc;
223 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
224 use std::task::{Wake, Waker};
225
226 fn fake_requester() -> ConnectionSelectionRequester {
227 let (sender, _receiver) = futures::channel::mpsc::channel(1);
228 ConnectionSelectionRequester::new(sender)
229 }
230
231 #[test]
232 fn test_spawn_and_poll() {
233 let mut exec = TestExecutor::new();
234 let mut manager = ConnectionSelectionManager::new(fake_requester());
235
236 let selection_id = SelectionIdentifier::Automatic;
237 let fut = async move { Ok(ConnectionSelectionResponse::Autoconnect(None)) };
238 manager.spawn(selection_id.clone(), Box::pin(fut));
239
240 let mut futures = ConnectionSelectionFutures::new(&mut manager);
241 assert_matches!(
242 exec.run_until_stalled(&mut futures),
243 Poll::Ready((id, Ok(Some(ConnectionSelectionResponse::Autoconnect(None))))) if id == selection_id
244 );
245 }
246
247 #[test]
248 fn test_spawn_duplicate() {
249 let mut manager = ConnectionSelectionManager::new(fake_requester());
250
251 let selection_id = SelectionIdentifier::Automatic;
252
253 manager.spawn(
255 selection_id.clone(),
256 Box::pin(async { Ok(ConnectionSelectionResponse::Autoconnect(None)) }),
257 );
258 assert_eq!(manager.active_selections().len(), 1);
259
260 manager.spawn(
262 selection_id.clone(),
263 Box::pin(async { Ok(ConnectionSelectionResponse::Autoconnect(None)) }),
264 );
265
266 assert_eq!(manager.active_selections().len(), 1);
268 }
269
270 #[test]
271 fn test_cancel_specific() {
272 let mut exec = TestExecutor::new();
273 let mut manager = ConnectionSelectionManager::new(fake_requester());
274
275 let id_auto = SelectionIdentifier::Automatic;
276 let id_req = SelectionIdentifier::ConnectRequest(types::NetworkIdentifier {
277 ssid: types::Ssid::from_bytes_unchecked(b"test".to_vec()),
278 security_type: types::SecurityType::Wpa2,
279 });
280
281 manager.spawn(id_auto.clone(), Box::pin(futures::future::pending()));
283 manager.spawn(id_req.clone(), Box::pin(futures::future::pending()));
284
285 assert_eq!(manager.active_selections().len(), 2);
286
287 manager.cancel(&id_auto);
289
290 let mut futures = ConnectionSelectionFutures::new(&mut manager);
292 assert_matches!(
293 exec.run_until_stalled(&mut futures),
294 Poll::Ready((id, Ok(None))) if id == id_auto
295 );
296
297 assert_eq!(manager.active_selections().len(), 1);
299 assert!(manager.active_selections().contains(&id_req));
300 }
301
302 #[test]
303 fn test_cancel_all() {
304 let mut exec = TestExecutor::new();
305 let mut manager = ConnectionSelectionManager::new(fake_requester());
306
307 let id1 = SelectionIdentifier::Automatic;
308 let id2 = SelectionIdentifier::ConnectRequest(types::NetworkIdentifier {
309 ssid: types::Ssid::from_bytes_unchecked(b"test".to_vec()),
310 security_type: types::SecurityType::Wpa2,
311 });
312
313 manager.spawn(id1.clone(), Box::pin(futures::future::pending()));
314 manager.spawn(id2.clone(), Box::pin(futures::future::pending()));
315
316 assert_eq!(manager.active_selections().len(), 2);
317
318 manager.cancel_all();
319
320 assert_eq!(manager.active_selections().len(), 0);
321
322 let mut futures = ConnectionSelectionFutures::new(&mut manager);
324
325 let mut cancelled_count = 0;
327
328 loop {
329 match exec.run_until_stalled(&mut futures) {
330 Poll::Ready((_, Ok(None))) => cancelled_count += 1,
331 Poll::Pending => break,
332 _ => panic!("Unexpected result"),
333 }
334 }
335
336 assert_eq!(cancelled_count, 2);
337 }
338
339 #[test]
340 fn test_internal_state_access() {
341 let mut manager = ConnectionSelectionManager::new(fake_requester());
342 let id = SelectionIdentifier::Automatic;
343 manager.spawn(id.clone(), Box::pin(futures::future::pending()));
344
345 assert_eq!(manager.get_cancellation_handles().len(), 1);
346 assert!(!manager.get_futures().is_empty());
347 }
348
349 #[test]
350 fn test_poll_empty_twice_does_not_panic() {
351 let mut exec = TestExecutor::new();
352 let mut manager = ConnectionSelectionManager::new(fake_requester());
353
354 let mut futures = ConnectionSelectionFutures::new(&mut manager);
355 assert_matches!(exec.run_until_stalled(&mut futures), Poll::Pending);
356 assert_matches!(exec.run_until_stalled(&mut futures), Poll::Pending);
357 }
358
359 #[test]
360 fn test_spawn_wakes_after_empty() {
361 struct FlagWaker(AtomicBool);
362 impl Wake for FlagWaker {
363 fn wake(self: Arc<Self>) {
364 self.0.store(true, Ordering::SeqCst);
365 }
366 }
367
368 let mut manager = ConnectionSelectionManager::new(fake_requester());
369
370 let flag = Arc::new(FlagWaker(AtomicBool::new(false)));
371 let waker = Waker::from(flag.clone());
372 let mut cx = Context::from_waker(&waker);
373
374 {
376 let mut futures = ConnectionSelectionFutures::new(&mut manager);
377 assert_matches!(Pin::new(&mut futures).poll(&mut cx), Poll::Pending);
378 }
379
380 assert!(!flag.0.load(Ordering::SeqCst));
382
383 let selection_id = SelectionIdentifier::Automatic;
385 let fut = async move { Ok(ConnectionSelectionResponse::Autoconnect(None)) };
386 manager.spawn(selection_id, Box::pin(fut));
387
388 assert!(
390 flag.0.load(Ordering::SeqCst),
391 "Waker should have been awakened by spawn() after empty poll"
392 );
393 }
394
395 #[test]
396 fn test_spawn_wakes_while_busy() {
397 struct CounterWaker(AtomicUsize);
399 impl Wake for CounterWaker {
400 fn wake(self: Arc<Self>) {
401 let _ = self.0.fetch_add(1, Ordering::SeqCst);
402 }
403 }
404
405 let mut manager = ConnectionSelectionManager::new(fake_requester());
406
407 let flag = Arc::new(CounterWaker(AtomicUsize::new(0)));
408 let waker = Waker::from(flag.clone());
409 let mut cx = Context::from_waker(&waker);
410
411 manager.spawn(SelectionIdentifier::Automatic, Box::pin(futures::future::pending()));
413
414 {
415 let mut futures = ConnectionSelectionFutures::new(&mut manager);
416 assert_matches!(Pin::new(&mut futures).poll(&mut cx), Poll::Pending);
417 }
418 assert_eq!(flag.0.load(Ordering::SeqCst), 1);
419
420 let selection_id =
422 SelectionIdentifier::ConnectRequest(generate_random_network_identifier());
423 manager.spawn(
424 selection_id.clone(),
425 Box::pin(futures::future::ready(Ok(ConnectionSelectionResponse::Autoconnect(None)))),
426 );
427 assert_eq!(
428 flag.0.load(Ordering::SeqCst),
429 2,
430 "Manager should wake stored waker on spawn even when busy"
431 );
432 }
433}