Skip to main content

wlancfg_lib/client/connection_selection/
fut_manager.rs

1// Copyright 2026 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::client::connection_selection::ConnectionSelectionRequester;
6use crate::client::types;
7use crate::mode_management::iface_manager_api::ConnectAttemptRequest;
8use anyhow::format_err;
9use futures::channel::oneshot;
10use futures::future::{FusedFuture, Future, FutureExt, LocalBoxFuture};
11use futures::stream::{FuturesUnordered, Stream};
12use log::warn;
13use std::collections::HashMap;
14use std::pin::Pin;
15use std::task::{Context, Poll, Waker};
16
17/// Manages the active connection selection futures, allowing for selective cancellation.
18pub struct ConnectionSelectionManager {
19    /// Collection of running connection selection futures, to be polled.
20    #[allow(clippy::type_complexity)]
21    futures: FuturesUnordered<
22        LocalBoxFuture<
23            'static,
24            (SelectionIdentifier, Result<Option<ConnectionSelectionResponse>, anyhow::Error>),
25        >,
26    >,
27    /// Maps a SelectionIdentifier to a sender that can cancel the corresponding future.
28    cancellation_handles: HashMap<SelectionIdentifier, oneshot::Sender<()>>,
29    requester: ConnectionSelectionRequester,
30    waker: Option<Waker>,
31}
32
33impl ConnectionSelectionManager {
34    pub fn new(requester: ConnectionSelectionRequester) -> Self {
35        Self {
36            futures: FuturesUnordered::new(),
37            cancellation_handles: HashMap::new(),
38            requester,
39            waker: None,
40        }
41    }
42
43    /// Spawns a new connection selection future with an associated SelectionIdentifier.
44    /// Wraps the future in a cancellation logic.
45    pub fn spawn(
46        &mut self,
47        selection_id: SelectionIdentifier,
48        selection_future: LocalBoxFuture<
49            'static,
50            Result<ConnectionSelectionResponse, anyhow::Error>,
51        >,
52    ) {
53        if self.cancellation_handles.contains_key(&selection_id) {
54            warn!("{} already in progress. Ignoring new request.", selection_id);
55            return;
56        }
57
58        let (sender, receiver) = oneshot::channel();
59        let _ = self.cancellation_handles.insert(selection_id.clone(), sender);
60        let cancellable_future = async move {
61            futures::select! {
62                result = selection_future.fuse() => (selection_id.clone(), result.map(Some)),
63                _ = receiver.fuse() => (selection_id, Ok(None)),
64            }
65        };
66
67        self.futures.push(cancellable_future.boxed_local());
68        // If the manager was previously polled and returned Poll::Pending, it would have stored
69        // a waker, provided by the poller. That waker needs to be awakened to inform the poller
70        // that the manager now has more work to do.
71        if let Some(waker) = &self.waker {
72            waker.wake_by_ref();
73        }
74    }
75
76    /// Cancels the future associated with the given selection_id.
77    pub fn cancel(&mut self, selection_id: &SelectionIdentifier) {
78        if let Some(cancel_tx) = self.cancellation_handles.remove(selection_id) {
79            let _ = cancel_tx.send(());
80        }
81    }
82
83    /// Cancels all active connection selections.
84    pub fn cancel_all(&mut self) {
85        for selection_id in self.active_selections() {
86            self.cancel(&selection_id);
87        }
88    }
89
90    /// Returns a list of SelectionIdentifiers for connection selections currently being processed.
91    pub fn active_selections(&self) -> Vec<SelectionIdentifier> {
92        self.cancellation_handles.keys().cloned().collect()
93    }
94
95    pub fn initiate_automatic_connection_selection(&mut self) {
96        let mut requester = self.requester.clone();
97        let fut = async move {
98            requester
99                .do_connection_selection(None, types::ConnectReason::IdleInterfaceAutoconnect)
100                .await
101                .map_err(|e| format_err!("Error sending connection selection request: {}.", e))
102                .map(ConnectionSelectionResponse::Autoconnect)
103        };
104        self.spawn(SelectionIdentifier::Automatic, Box::pin(fut).boxed_local());
105    }
106
107    pub fn initiate_connection_selection_for_connect_request(
108        &mut self,
109        request: ConnectAttemptRequest,
110    ) {
111        let mut requester = self.requester.clone();
112        let network = request.network.clone();
113        let fut = async move {
114            requester
115                .do_connection_selection(Some(request.network.clone()), request.reason)
116                .await
117                .map_err(|e| format_err!("Error sending connection selection request: {}.", e))
118                .map(move |candidate| ConnectionSelectionResponse::ConnectRequest {
119                    candidate,
120                    request,
121                })
122        };
123        self.spawn(SelectionIdentifier::ConnectRequest(network), Box::pin(fut).boxed_local());
124    }
125
126    #[cfg(test)]
127    #[allow(clippy::type_complexity)]
128    pub fn get_futures(
129        &mut self,
130    ) -> &mut FuturesUnordered<
131        LocalBoxFuture<
132            'static,
133            (SelectionIdentifier, Result<Option<ConnectionSelectionResponse>, anyhow::Error>),
134        >,
135    > {
136        &mut self.futures
137    }
138
139    #[cfg(test)]
140    pub fn get_cancellation_handles(
141        &mut self,
142    ) -> &mut HashMap<SelectionIdentifier, oneshot::Sender<()>> {
143        &mut self.cancellation_handles
144    }
145}
146
147/// A wrapper future to safely poll the ConnectionSelectionManager within a select! macro.
148pub struct ConnectionSelectionFutures<'a> {
149    manager: &'a mut ConnectionSelectionManager,
150}
151impl<'a> ConnectionSelectionFutures<'a> {
152    pub fn new(manager: &'a mut ConnectionSelectionManager) -> Self {
153        Self { manager }
154    }
155}
156impl<'a> Future for ConnectionSelectionFutures<'a> {
157    type Output = (SelectionIdentifier, Result<Option<ConnectionSelectionResponse>, anyhow::Error>);
158
159    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
160        // Clear any stored waker, in case its obsolete.
161        self.manager.waker = None;
162
163        match Pin::new(&mut self.manager.futures).poll_next(cx) {
164            Poll::Ready(Some((selection_id, result))) => {
165                // Clean up the cancellation map when a future completes.
166                let _ = self.manager.cancellation_handles.remove(&selection_id);
167                Poll::Ready((selection_id, result))
168            }
169            Poll::Ready(None) | Poll::Pending => {
170                // According to the docs, a FuturesUnordered must be polled at least once after a
171                // new future is added in order for that new future to get advanced. We want to
172                // achieve that by having the external caller poll ConnectionSelectionFutures, which
173                // will cause the internal FuturesUnordered to be polled. Therefore, this stores a
174                // copy of the external caller's waker, and use it to wake up that caller when a
175                // new future is spawned. Otherwise, there is no guarantee that the new futures get
176                // polled.
177                self.manager.waker = Some(cx.waker().clone());
178                Poll::Pending
179            }
180        }
181    }
182}
183impl<'a> FusedFuture for ConnectionSelectionFutures<'a> {
184    fn is_terminated(&self) -> bool {
185        // This tells select! that futures will continue to be added to the manager, so it should
186        // never be considered terminated.
187        false
188    }
189}
190
191// Identifier for cancellable connection selection futures.
192#[derive(Clone, PartialEq, Eq, Hash)]
193#[cfg_attr(test, derive(Debug))]
194pub enum SelectionIdentifier {
195    ConnectRequest(types::NetworkIdentifier),
196    Automatic,
197}
198impl std::fmt::Display for SelectionIdentifier {
199    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
200        match self {
201            SelectionIdentifier::ConnectRequest(id) => {
202                write!(f, "Connection selection for network: {}", id)
203            }
204            SelectionIdentifier::Automatic => write!(f, "Automatic connection selection"),
205        }
206    }
207}
208
209#[cfg_attr(test, derive(Debug))]
210pub enum ConnectionSelectionResponse {
211    ConnectRequest { candidate: Option<types::ScannedCandidate>, request: ConnectAttemptRequest },
212    Autoconnect(Option<types::ScannedCandidate>),
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use crate::util::testing::generate_random_network_identifier;
219    use assert_matches::assert_matches;
220    use fuchsia_async::TestExecutor;
221    use futures::task::Poll;
222    use std::sync::Arc;
223    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
224    use std::task::{Wake, Waker};
225
226    fn fake_requester() -> ConnectionSelectionRequester {
227        let (sender, _receiver) = futures::channel::mpsc::channel(1);
228        ConnectionSelectionRequester::new(sender)
229    }
230
231    #[test]
232    fn test_spawn_and_poll() {
233        let mut exec = TestExecutor::new();
234        let mut manager = ConnectionSelectionManager::new(fake_requester());
235
236        let selection_id = SelectionIdentifier::Automatic;
237        let fut = async move { Ok(ConnectionSelectionResponse::Autoconnect(None)) };
238        manager.spawn(selection_id.clone(), Box::pin(fut));
239
240        let mut futures = ConnectionSelectionFutures::new(&mut manager);
241        assert_matches!(
242            exec.run_until_stalled(&mut futures),
243            Poll::Ready((id, Ok(Some(ConnectionSelectionResponse::Autoconnect(None))))) if id == selection_id
244        );
245    }
246
247    #[test]
248    fn test_spawn_duplicate() {
249        let mut manager = ConnectionSelectionManager::new(fake_requester());
250
251        let selection_id = SelectionIdentifier::Automatic;
252
253        // Spawn first
254        manager.spawn(
255            selection_id.clone(),
256            Box::pin(async { Ok(ConnectionSelectionResponse::Autoconnect(None)) }),
257        );
258        assert_eq!(manager.active_selections().len(), 1);
259
260        // Spawn second (duplicate)
261        manager.spawn(
262            selection_id.clone(),
263            Box::pin(async { Ok(ConnectionSelectionResponse::Autoconnect(None)) }),
264        );
265
266        // Should still be 1 because duplicate was ignored
267        assert_eq!(manager.active_selections().len(), 1);
268    }
269
270    #[test]
271    fn test_cancel_specific() {
272        let mut exec = TestExecutor::new();
273        let mut manager = ConnectionSelectionManager::new(fake_requester());
274
275        let id_auto = SelectionIdentifier::Automatic;
276        let id_req = SelectionIdentifier::ConnectRequest(types::NetworkIdentifier {
277            ssid: types::Ssid::from_bytes_unchecked(b"test".to_vec()),
278            security_type: types::SecurityType::Wpa2,
279        });
280
281        // Spawn two futures that pending forever
282        manager.spawn(id_auto.clone(), Box::pin(futures::future::pending()));
283        manager.spawn(id_req.clone(), Box::pin(futures::future::pending()));
284
285        assert_eq!(manager.active_selections().len(), 2);
286
287        // Cancel one
288        manager.cancel(&id_auto);
289
290        // Polling should return cancelled result for id_auto
291        let mut futures = ConnectionSelectionFutures::new(&mut manager);
292        assert_matches!(
293            exec.run_until_stalled(&mut futures),
294            Poll::Ready((id, Ok(None))) if id == id_auto
295        );
296
297        // Check active selections - the cancelled handle should be removed immediately.
298        assert_eq!(manager.active_selections().len(), 1);
299        assert!(manager.active_selections().contains(&id_req));
300    }
301
302    #[test]
303    fn test_cancel_all() {
304        let mut exec = TestExecutor::new();
305        let mut manager = ConnectionSelectionManager::new(fake_requester());
306
307        let id1 = SelectionIdentifier::Automatic;
308        let id2 = SelectionIdentifier::ConnectRequest(types::NetworkIdentifier {
309            ssid: types::Ssid::from_bytes_unchecked(b"test".to_vec()),
310            security_type: types::SecurityType::Wpa2,
311        });
312
313        manager.spawn(id1.clone(), Box::pin(futures::future::pending()));
314        manager.spawn(id2.clone(), Box::pin(futures::future::pending()));
315
316        assert_eq!(manager.active_selections().len(), 2);
317
318        manager.cancel_all();
319
320        assert_eq!(manager.active_selections().len(), 0);
321
322        // Both should return cancelled
323        let mut futures = ConnectionSelectionFutures::new(&mut manager);
324
325        // We expect two ready results of None
326        let mut cancelled_count = 0;
327
328        loop {
329            match exec.run_until_stalled(&mut futures) {
330                Poll::Ready((_, Ok(None))) => cancelled_count += 1,
331                Poll::Pending => break,
332                _ => panic!("Unexpected result"),
333            }
334        }
335
336        assert_eq!(cancelled_count, 2);
337    }
338
339    #[test]
340    fn test_internal_state_access() {
341        let mut manager = ConnectionSelectionManager::new(fake_requester());
342        let id = SelectionIdentifier::Automatic;
343        manager.spawn(id.clone(), Box::pin(futures::future::pending()));
344
345        assert_eq!(manager.get_cancellation_handles().len(), 1);
346        assert!(!manager.get_futures().is_empty());
347    }
348
349    #[test]
350    fn test_poll_empty_twice_does_not_panic() {
351        let mut exec = TestExecutor::new();
352        let mut manager = ConnectionSelectionManager::new(fake_requester());
353
354        let mut futures = ConnectionSelectionFutures::new(&mut manager);
355        assert_matches!(exec.run_until_stalled(&mut futures), Poll::Pending);
356        assert_matches!(exec.run_until_stalled(&mut futures), Poll::Pending);
357    }
358
359    #[test]
360    fn test_spawn_wakes_after_empty() {
361        struct FlagWaker(AtomicBool);
362        impl Wake for FlagWaker {
363            fn wake(self: Arc<Self>) {
364                self.0.store(true, Ordering::SeqCst);
365            }
366        }
367
368        let mut manager = ConnectionSelectionManager::new(fake_requester());
369
370        let flag = Arc::new(FlagWaker(AtomicBool::new(false)));
371        let waker = Waker::from(flag.clone());
372        let mut cx = Context::from_waker(&waker);
373
374        // Poll empty futures
375        {
376            let mut futures = ConnectionSelectionFutures::new(&mut manager);
377            assert_matches!(Pin::new(&mut futures).poll(&mut cx), Poll::Pending);
378        }
379
380        // Verify the wakers flag is still false.
381        assert!(!flag.0.load(Ordering::SeqCst));
382
383        // Spawn a new future
384        let selection_id = SelectionIdentifier::Automatic;
385        let fut = async move { Ok(ConnectionSelectionResponse::Autoconnect(None)) };
386        manager.spawn(selection_id, Box::pin(fut));
387
388        // Verify waker was triggered
389        assert!(
390            flag.0.load(Ordering::SeqCst),
391            "Waker should have been awakened by spawn() after empty poll"
392        );
393    }
394
395    #[test]
396    fn test_spawn_wakes_while_busy() {
397        // Create a waker that increments a counter when awakened.
398        struct CounterWaker(AtomicUsize);
399        impl Wake for CounterWaker {
400            fn wake(self: Arc<Self>) {
401                let _ = self.0.fetch_add(1, Ordering::SeqCst);
402            }
403        }
404
405        let mut manager = ConnectionSelectionManager::new(fake_requester());
406
407        let flag = Arc::new(CounterWaker(AtomicUsize::new(0)));
408        let waker = Waker::from(flag.clone());
409        let mut cx = Context::from_waker(&waker);
410
411        // Spawn a future that never completes. This represents an ongoing connection selection future.
412        manager.spawn(SelectionIdentifier::Automatic, Box::pin(futures::future::pending()));
413
414        {
415            let mut futures = ConnectionSelectionFutures::new(&mut manager);
416            assert_matches!(Pin::new(&mut futures).poll(&mut cx), Poll::Pending);
417        }
418        assert_eq!(flag.0.load(Ordering::SeqCst), 1);
419
420        // Spawn a new future.
421        let selection_id =
422            SelectionIdentifier::ConnectRequest(generate_random_network_identifier());
423        manager.spawn(
424            selection_id.clone(),
425            Box::pin(futures::future::ready(Ok(ConnectionSelectionResponse::Autoconnect(None)))),
426        );
427        assert_eq!(
428            flag.0.load(Ordering::SeqCst),
429            2,
430            "Manager should wake stored waker on spawn even when busy"
431        );
432    }
433}