pub struct HangingGetBroker<S, O: Unpin + 'static, F: Fn(&S, O) -> bool> { /* private fields */ }
Expand description
A Send
wrapper for a HangingGet
that can receive messages via an async channel.
The HangingGetBroker
is the primary way of implementing server side hanging get using
this library. It manages all state and reacts to inputs sent over channels.
§Example Usage:
Assuming some fidl protocol with a hanging get method:
protocol SheepCounter {
/// Returns the current number of sheep that have jumped the fence
/// when that number changes.
WatchCount() -> (uint64 count);
}
A server implementation might include the following:
let broker = HangingGetBroker::new(
0u64, // Initial state
|s, o: SheepCounterWatchCountResponder| {
o.send(s.clone()).unwrap();
true
}, // notify function with fidl auto-generated responder
DEFAULT_CHANNEL_SIZE, // Size of channels used by Publishers and Subscribers
);
// Create a new publisher that can be used to publish updates to the state
let mut publisher = broker.new_publisher();
// Create a new registrar that can be used to register subscribers
let mut registrar = broker.new_registrar();
// Spawn broker as an async task that will run until there are not any more
// `SubscriptionRegistrar`, `Publisher`, or `Subscriber` objects that can update the system.
fuchsia_async::Task::spawn(broker.run()).detach();
// Spawn a background task to count sheep
fuchsia_async::Task::spawn(async move {
let interval = fuchsia_async::Interval::new(1.second);
loop {
interval.next.await();
publisher.update(|sheep_count| *sheep_count += 1);
}
}).detach();
// Create a new `ServiceFs` and register SheepCounter fidl service
let mut fs = ServiceFs::new();
fs.dir("svc").add_fidl_service(|s: SheepCounterRequestStream| s);
// SubscriptionRegistrar new client connections sequentially
while let Some(mut stream) = fs.next().await {
// Create a new subscriber associated with this client's request stream
let mut subscriber = registrar.new_subscriber().await.unwrap();
// SubscriptionRegistrar requests from this client by registering new observers
fuchsia_async::Task::spawn(async move {
while let Some(Ok(SheepCounterWatchCountRequest { responder })) = stream.next().await {
subscriber.register(responder).await.unwrap();
}
}).detach();
}
Implementations§
Source§impl<S, O, F> HangingGetBroker<S, O, F>
impl<S, O, F> HangingGetBroker<S, O, F>
Sourcepub fn new(state: S, notify: F, channel_size: usize) -> Self
pub fn new(state: S, notify: F, channel_size: usize) -> Self
Create a new broker.
state
is the initial state of the HangingGet
notify
is a Fn
that is used to notify observers of state.
channel_size
is the maximum queue size of unprocessed messages from an individual object.
Sourcepub fn new_publisher(&self) -> Publisher<S>
pub fn new_publisher(&self) -> Publisher<S>
Create a new Publisher
that can be used to communicate state updates
with this HangingGetBroker
from another thread or async task.
Sourcepub fn new_registrar(&self) -> SubscriptionRegistrar<O>
pub fn new_registrar(&self) -> SubscriptionRegistrar<O>
Create a new SubscriptionRegistrar
that can be used to register new subscribers
with this HangingGetBroker
from another thread or async task.