openthread/ot/
state.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5// for now, we allow this for this module because we can't apply it
6// specifically to the type `ChangedFlags`, due to a bug in `bitflags!`.
7#![allow(missing_docs)]
8
9use crate::prelude_internal::*;
10
11use bitflags::bitflags;
12use core::pin::Pin;
13use core::task::{Context, Poll};
14use fuchsia_sync::Mutex;
15use std::sync::Arc;
16use std::task::Waker;
17
18bitflags! {
19#[repr(C)]
20#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)]
21pub struct ChangedFlags : ::std::os::raw::c_uint {
22    const IP6_ADDRESS_ADDED = OT_CHANGED_IP6_ADDRESS_ADDED;
23    const IP6_ADDRESS_REMOVED = OT_CHANGED_IP6_ADDRESS_REMOVED;
24    const THREAD_ROLE = OT_CHANGED_THREAD_ROLE;
25    const THREAD_LL_ADDR = OT_CHANGED_THREAD_LL_ADDR;
26    const THREAD_ML_ADDR = OT_CHANGED_THREAD_ML_ADDR;
27    const THREAD_RLOC_ADDED = OT_CHANGED_THREAD_RLOC_ADDED;
28    const THREAD_RLOC_REMOVED = OT_CHANGED_THREAD_RLOC_REMOVED;
29    const THREAD_PARTITION_ID = OT_CHANGED_THREAD_PARTITION_ID;
30    const THREAD_KEY_SEQUENCE_COUNTER = OT_CHANGED_THREAD_KEY_SEQUENCE_COUNTER;
31    const THREAD_NETDATA = OT_CHANGED_THREAD_NETDATA;
32    const THREAD_CHILD_ADDED = OT_CHANGED_THREAD_CHILD_ADDED;
33    const THREAD_CHILD_REMOVED = OT_CHANGED_THREAD_CHILD_REMOVED;
34    const IP6_MULTICAST_SUBSCRIBED = OT_CHANGED_IP6_MULTICAST_SUBSCRIBED;
35    const IP6_MULTICAST_UNSUBSCRIBED = OT_CHANGED_IP6_MULTICAST_UNSUBSCRIBED;
36    const THREAD_CHANNEL = OT_CHANGED_THREAD_CHANNEL;
37    const THREAD_PANID = OT_CHANGED_THREAD_PANID;
38    const THREAD_NETWORK_NAME = OT_CHANGED_THREAD_NETWORK_NAME;
39    const THREAD_EXT_PANID = OT_CHANGED_THREAD_EXT_PANID;
40    const NETWORK_KEY = OT_CHANGED_NETWORK_KEY;
41    const PSKC = OT_CHANGED_PSKC;
42    const SECURITY_POLICY = OT_CHANGED_SECURITY_POLICY;
43    const CHANNEL_MANAGER_NEW_CHANNEL = OT_CHANGED_CHANNEL_MANAGER_NEW_CHANNEL;
44    const SUPPORTED_CHANNEL_MASK = OT_CHANGED_SUPPORTED_CHANNEL_MASK;
45    const COMMISSIONER_STATE = OT_CHANGED_COMMISSIONER_STATE;
46    const THREAD_NETIF_STATE = OT_CHANGED_THREAD_NETIF_STATE;
47    const THREAD_BACKBONE_ROUTER_STATE = OT_CHANGED_THREAD_BACKBONE_ROUTER_STATE;
48    const THREAD_BACKBONE_ROUTER_LOCAL = OT_CHANGED_THREAD_BACKBONE_ROUTER_LOCAL;
49    const JOINER_STATE = OT_CHANGED_JOINER_STATE;
50    const ACTIVE_DATASET = OT_CHANGED_ACTIVE_DATASET;
51    const PENDING_DATASET = OT_CHANGED_PENDING_DATASET;
52    const NAT64_TRANSLATOR_STATE = OT_CHANGED_NAT64_TRANSLATOR_STATE;
53}
54}
55
56/// State-change-related methods from the
57/// [OpenThread "Instance" Module](https://openthread.io/reference/group/api-instance).
58pub trait State {
59    /// Functional equivalent to
60    /// [`otsys::otSetStateChangedCallback`](crate::otsys::otSetStateChangedCallback).
61    fn set_state_changed_fn<F>(&self, f: Option<F>)
62    where
63        F: FnMut(ChangedFlags) + 'static;
64
65    /// Returns an asynchronous stream for state-change events.
66    fn state_changed_stream(&self) -> StateChangedStream;
67}
68
69impl<T: State + Boxable> State for ot::Box<T> {
70    fn set_state_changed_fn<F>(&self, f: Option<F>)
71    where
72        F: FnMut(ChangedFlags) + 'static,
73    {
74        self.as_ref().set_state_changed_fn(f);
75    }
76
77    fn state_changed_stream(&self) -> StateChangedStream {
78        self.as_ref().state_changed_stream()
79    }
80}
81
82/// Stream for getting state changed events.
83#[derive(Debug, Clone)]
84pub struct StateChangedStream(Arc<Mutex<(ChangedFlags, Waker)>>);
85
86impl Stream for StateChangedStream {
87    type Item = ChangedFlags;
88
89    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
90        let mut state = self.0.lock();
91
92        state.1 = cx.waker().clone();
93
94        if state.0 != ChangedFlags::empty() {
95            let flags = state.0;
96            state.0 = ChangedFlags::empty();
97            Poll::Ready(Some(flags))
98        } else {
99            Poll::Pending
100        }
101    }
102}
103
104impl State for Instance {
105    fn set_state_changed_fn<F>(&self, f: Option<F>)
106    where
107        F: FnMut(ChangedFlags) + 'static,
108    {
109        unsafe extern "C" fn _ot_state_changed_callback<F: FnMut(ChangedFlags)>(
110            flags: otChangedFlags,
111            context: *mut ::std::os::raw::c_void,
112        ) {
113            // Reconstitute a reference to our closure.
114            let sender = &mut *(context as *mut F);
115
116            sender(ChangedFlags::from_bits_truncate(flags))
117        }
118
119        let (fn_ptr, fn_box, cb): (_, _, otStateChangedCallback) = if let Some(f) = f {
120            let mut x = Box::new(f);
121
122            (
123                x.as_mut() as *mut F as *mut ::std::os::raw::c_void,
124                Some(x as Box<dyn FnMut(ChangedFlags)>),
125                Some(_ot_state_changed_callback::<F>),
126            )
127        } else {
128            (std::ptr::null_mut() as *mut ::std::os::raw::c_void, None, None)
129        };
130
131        unsafe {
132            otSetStateChangedCallback(self.as_ot_ptr(), cb, fn_ptr);
133        }
134
135        // Make sure our object eventually gets cleaned up.
136        self.borrow_backing().state_change_fn.set(fn_box);
137    }
138
139    fn state_changed_stream(&self) -> StateChangedStream {
140        let state = Arc::new(Mutex::new((ChangedFlags::empty(), futures::task::noop_waker())));
141
142        let state_copy = state.clone();
143
144        self.set_state_changed_fn(Some(move |flags: ChangedFlags| {
145            let mut borrowed = state_copy.lock();
146            borrowed.0 |= flags;
147            borrowed.1.wake_by_ref();
148        }));
149
150        StateChangedStream(state)
151    }
152}