1use anyhow::{Error, format_err};
6use fidl::endpoints::{ProtocolMarker, ServerEnd};
7use fuchsia_component::client::connect_to_protocol_at_path;
8use futures::task::{Context, Poll};
9use futures::{TryFuture, ready};
10use pin_project_lite::pin_project;
11use std::collections::VecDeque;
12use std::sync::LazyLock;
13use thiserror::Error;
14use {fidl_fuchsia_component as fcomponent, fidl_fuchsia_io as fio};
15
16pub static START_COMPONENT_TREE_STREAM: LazyLock<String> =
19 LazyLock::new(|| "StartComponentTree".into());
20
21pub fn event_name(event_type: &fcomponent::EventType) -> String {
23 match event_type {
24 fcomponent::EventType::CapabilityRequested => "capability_requested",
25 fcomponent::EventType::Discovered => unreachable!("This isn't used anymore"),
26 fcomponent::EventType::Destroyed => "destroyed",
27 fcomponent::EventType::Resolved => "resolved",
28 fcomponent::EventType::Unresolved => "unresolved",
29 fcomponent::EventType::Started => "started",
30 fcomponent::EventType::Stopped => "stopped",
31 fcomponent::EventType::DebugStarted => "debug_started",
32 }
33 .to_string()
34}
35
36pin_project! {
37 pub struct EventStream {
38 stream: fcomponent::EventStreamProxy,
39 buffer: VecDeque<fcomponent::Event>,
40 #[pin]
41 fut: Option<<fcomponent::EventStreamProxy as fcomponent::EventStreamProxyInterface>::GetNextResponseFut>,
42 }
43}
44
45#[derive(Debug, Error, Clone)]
46pub enum EventStreamError {
47 #[error("Stream terminated unexpectedly")]
48 StreamClosed,
49}
50
51impl EventStream {
52 pub fn new(stream: fcomponent::EventStreamProxy) -> Self {
53 Self { stream, buffer: VecDeque::new(), fut: None }
54 }
55
56 pub fn open_at_path_pipelined(path: impl Into<String>) -> Result<Self, Error> {
57 Ok(Self::new(connect_to_protocol_at_path::<fcomponent::EventStreamMarker>(&path.into())?))
58 }
59
60 pub async fn open_at_path(path: impl Into<String>) -> Result<Self, Error> {
61 let event_stream =
62 connect_to_protocol_at_path::<fcomponent::EventStreamMarker>(&path.into())?;
63 event_stream.wait_for_ready().await?;
64 Ok(Self::new(event_stream))
65 }
66
67 pub async fn open() -> Result<Self, Error> {
68 let event_stream = connect_to_protocol_at_path::<fcomponent::EventStreamMarker>(
69 "/svc/fuchsia.component.EventStream",
70 )?;
71 event_stream.wait_for_ready().await?;
72 Ok(Self::new(event_stream))
73 }
74
75 pub fn open_pipelined() -> Result<Self, Error> {
76 Ok(Self::new(connect_to_protocol_at_path::<fcomponent::EventStreamMarker>(
77 "/svc/fuchsia.component.EventStream",
78 )?))
79 }
80}
81
82impl futures::Stream for EventStream {
83 type Item = fcomponent::Event;
84
85 fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
86 let mut this = self.project();
87
88 if let Some(event) = this.buffer.pop_front() {
90 return Poll::Ready(Some(event));
91 }
92
93 if let None = this.fut.as_mut().as_pin_mut() {
95 this.fut.set(Some(this.stream.get_next()));
96 }
97
98 let step = ready!(this.fut.as_mut().as_pin_mut().unwrap().try_poll(cx));
99 this.fut.set(None);
100
101 match step {
102 Ok(events) => {
103 let mut iter = events.into_iter();
104 let ret = iter.next().unwrap();
105 while let Some(leftover) = iter.next() {
107 this.buffer.push_back(leftover);
108 }
109 Poll::Ready(Some(ret))
110 }
111 Err(_) => Poll::Ready(None),
112 }
113 }
114}
115
116pub trait Event: TryFrom<fcomponent::Event, Error = anyhow::Error> {
118 const TYPE: fcomponent::EventType;
119 const NAME: &'static str;
120
121 fn target_moniker(&self) -> &str;
122 fn component_url(&self) -> &str;
123 fn timestamp(&self) -> zx::BootInstant;
124 fn is_ok(&self) -> bool;
125 fn is_err(&self) -> bool;
126}
127
128#[derive(Copy, Debug, PartialEq, Eq, Clone, Ord, PartialOrd)]
129pub enum ExitStatus {
132 Clean,
133 Crash(i32),
134}
135
136impl From<i32> for ExitStatus {
137 fn from(exit_status: i32) -> Self {
138 match exit_status {
139 0 => ExitStatus::Clean,
140 _ => ExitStatus::Crash(exit_status),
141 }
142 }
143}
144
145#[derive(Debug)]
146struct EventHeader {
147 event_type: fcomponent::EventType,
148 component_url: String,
149 moniker: String,
150 timestamp: zx::BootInstant,
151}
152
153impl TryFrom<fcomponent::EventHeader> for EventHeader {
154 type Error = anyhow::Error;
155
156 fn try_from(header: fcomponent::EventHeader) -> Result<Self, Self::Error> {
157 let event_type = header.event_type.ok_or_else(|| format_err!("No event type"))?;
158 let component_url = header.component_url.ok_or_else(|| format_err!("No component url"))?;
159 let moniker = header.moniker.ok_or_else(|| format_err!("No moniker"))?;
160 let timestamp = header
161 .timestamp
162 .ok_or_else(|| format_err!("Missing timestamp from the Event object"))?;
163 Ok(EventHeader { event_type, component_url, moniker, timestamp })
164 }
165}
166
167#[derive(Debug, PartialEq, Eq)]
168pub struct EventError {
169 pub description: String,
170}
171
172macro_rules! create_event {
194 (
196 event_type: $event_type:ident,
197 event_name: $event_name:ident,
198 payload: {
199 data: {$(
200 {
201 name: $data_name:ident,
202 ty: $data_ty:ty,
203 }
204 )*},
205 client_protocols: {$(
206 {
207 name: $client_protocol_name:ident,
208 ty: $client_protocol_ty:ty,
209 }
210 )*},
211 server_protocols: {$(
212 {
213 name: $server_protocol_name:ident,
214 }
215 )*},
216 },
217 error_payload: {
218 $(
219 {
220 name: $error_data_name:ident,
221 ty: $error_data_ty:ty,
222 }
223 )*
224 }
225 ) => {
226 paste::paste! {
227 #[derive(Debug)]
228 pub struct [<$event_type Payload>] {
229 $(pub $client_protocol_name: $client_protocol_ty,)*
230 $(pub $server_protocol_name: Option<zx::Channel>,)*
231 $(pub $data_name: $data_ty,)*
232 }
233
234 #[derive(Debug)]
235 pub struct [<$event_type Error>] {
236 $(pub $error_data_name: $error_data_ty,)*
237 pub description: String,
238 }
239
240 #[derive(Debug)]
241 pub struct $event_type {
242 header: EventHeader,
243 result: Result<[<$event_type Payload>], [<$event_type Error>]>,
244 }
245
246 impl $event_type {
247 pub fn result<'a>(&'a self) -> Result<&'a [<$event_type Payload>], &'a [<$event_type Error>]> {
248 self.result.as_ref()
249 }
250
251 $(
252 pub fn [<take_ $server_protocol_name>]<T: ProtocolMarker>(&mut self)
253 -> Option<T::RequestStream> {
254 self.result.as_mut()
255 .ok()
256 .and_then(|payload| payload.$server_protocol_name.take())
257 .map(|channel| {
258 let server_end = ServerEnd::<T>::new(channel);
259 server_end.into_stream()
260 })
261 }
262 )*
263 }
264
265 impl Event for $event_type {
266 const TYPE: fcomponent::EventType = fcomponent::EventType::$event_type;
267 const NAME: &'static str = stringify!($event_name);
268
269 fn target_moniker(&self) -> &str {
270 &self.header.moniker
271 }
272
273 fn component_url(&self) -> &str {
274 &self.header.component_url
275 }
276
277 fn timestamp(&self) -> zx::BootInstant {
278 self.header.timestamp
279 }
280
281 fn is_ok(&self) -> bool {
282 self.result.is_ok()
283 }
284
285 fn is_err(&self) -> bool {
286 self.result.is_err()
287 }
288 }
289
290 impl TryFrom<fcomponent::Event> for $event_type {
291 type Error = anyhow::Error;
292
293 fn try_from(event: fcomponent::Event) -> Result<Self, Self::Error> {
294 let result = match event.payload {
296 Some(payload) => {
297 #[allow(unused)]
300 let payload = match payload {
301 fcomponent::EventPayload::$event_type(payload) => Ok(payload),
302 _ => Err(format_err!("Incorrect payload type, {:?}", payload)),
303 }?;
304
305 $(
307 let $data_name: $data_ty = payload.$data_name.coerce().ok_or(
308 format_err!("Missing {} from {} object",
309 stringify!($data_name), stringify!($event_type))
310 )?;
311 )*
312
313 $(
315 let $client_protocol_name: $client_protocol_ty = payload.$client_protocol_name.ok_or(
316 format_err!("Missing {} from {} object",
317 stringify!($client_protocol_name), stringify!($event_type))
318 )?.into_proxy();
319 )*
320 $(
321 let $server_protocol_name: Option<zx::Channel> =
322 Some(payload.$server_protocol_name.ok_or(
323 format_err!("Missing {} from {} object",
324 stringify!($server_protocol_name), stringify!($event_type))
325 )?);
326 )*
327
328 #[allow(dead_code)]
329 let payload = paste::paste! {
330 [<$event_type Payload>] {
331 $($data_name,)*
332 $($client_protocol_name,)*
333 $($server_protocol_name,)*
334 }
335 };
336
337 Ok(Ok(payload))
338 },
339 None => Err(format_err!("Missing event_result from Event object")),
340 }?;
341
342 let event = {
343 let header = event.header
344 .ok_or(format_err!("Missing Event header"))
345 .and_then(|header| EventHeader::try_from(header))?;
346
347 if header.event_type != Self::TYPE {
348 return Err(format_err!("Incorrect event type"));
349 }
350
351 $event_type { header, result }
352 };
353 Ok(event)
354 }
355 }
356 }
357 };
358 ($event_type:ident, $event_name:ident) => {
359 create_event!(event_type: $event_type, event_name: $event_name,
360 payload: {
361 data: {},
362 client_protocols: {},
363 server_protocols: {},
364 },
365 error_payload: {});
366 };
367}
368
369create_event!(Destroyed, destroyed);
371create_event!(Resolved, resolved);
372create_event!(Unresolved, unresolved);
373create_event!(Started, started);
374create_event!(
375 event_type: Stopped,
376 event_name: stopped,
377 payload: {
378 data: {
379 {
380 name: status,
381 ty: ExitStatus,
382 }
383 {
384 name: exit_code,
385 ty: Option<i64>,
386 }
387 },
388 client_protocols: {},
389 server_protocols: {},
390 },
391 error_payload: {}
392);
393create_event!(
394 event_type: CapabilityRequested,
395 event_name: capability_requested,
396 payload: {
397 data: {
398 {
399 name: name,
400 ty: String,
401 }
402 },
403 client_protocols: {},
404 server_protocols: {
405 {
406 name: capability,
407 }
408 },
409 },
410 error_payload: {
411 {
412 name: name,
413 ty: String,
414 }
415 }
416);
417create_event!(
418 event_type: DebugStarted,
419 event_name: debug_started,
420 payload: {
421 data: {
422 {
423 name: break_on_start,
424 ty: zx::EventPair,
425 }
426 },
427 client_protocols: {
428 {
429 name: runtime_dir,
430 ty: fio::DirectoryProxy,
431 }
432 },
433 server_protocols: {},
434 },
435 error_payload: {}
436);
437
438trait Coerce<T> {
439 fn coerce(self) -> Option<T>;
440}
441
442impl<T> Coerce<T> for Option<T> {
443 fn coerce(self) -> Option<T> {
444 self
445 }
446}
447
448impl<T> Coerce<Option<T>> for Option<T> {
449 fn coerce(self) -> Option<Option<T>> {
450 Some(self)
451 }
452}
453
454impl Coerce<ExitStatus> for Option<i32> {
455 fn coerce(self) -> Option<ExitStatus> {
456 self.map(Into::into)
457 }
458}