1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    fuchsia_async as fasync,
    fuchsia_inspect::{self as inspect, Node, NumericProperty, Property},
    fuchsia_inspect_contrib::nodes::NodeExt,
    fuchsia_inspect_derive::Inspect,
    fuchsia_zircon as zx,
    std::fmt,
};

const FALSE_VALUE: u64 = 0;
const TRUE_VALUE: u64 = 1;

/// Convert a type to the correct supported Inspect Property type. This is used in Bluetooth to
/// ensure consistent representations of values in Inspect.
///
/// Note: It represents them appropriately for Bluetooth but may not be the appropriate type
/// for other use cases. It shouldn't be used outside of the Bluetooth project.
pub trait ToProperty {
    type PropertyType;
    fn to_property(&self) -> Self::PropertyType;
}

impl ToProperty for bool {
    type PropertyType = u64;
    fn to_property(&self) -> Self::PropertyType {
        if *self {
            TRUE_VALUE
        } else {
            FALSE_VALUE
        }
    }
}

impl ToProperty for Option<bool> {
    type PropertyType = u64;
    fn to_property(&self) -> Self::PropertyType {
        self.as_ref().map(bool::to_property).unwrap_or(FALSE_VALUE)
    }
}

impl ToProperty for String {
    type PropertyType = String;
    fn to_property(&self) -> Self::PropertyType {
        self.to_string()
    }
}

impl<T, V> ToProperty for Vec<T>
where
    T: ToProperty<PropertyType = V>,
    V: ToString,
{
    type PropertyType = String;
    fn to_property(&self) -> Self::PropertyType {
        self.iter()
            .map(|t| <T as ToProperty>::to_property(t).to_string())
            .collect::<Vec<String>>()
            .join(", ")
    }
}

/// Vectors of T show up as a comma separated list string property. `None` types are
/// represented as an empty string.
impl<T, V> ToProperty for Option<Vec<T>>
where
    T: ToProperty<PropertyType = V>,
    V: ToString,
{
    type PropertyType = String;
    fn to_property(&self) -> Self::PropertyType {
        self.as_ref().map(ToProperty::to_property).unwrap_or_else(String::new)
    }
}

/// Convenience function to create a string containing the debug representation of an object that
/// implements `Debug`
pub trait DebugExt {
    fn debug(&self) -> String;
}

impl<T: fmt::Debug> DebugExt for T {
    fn debug(&self) -> String {
        format!("{:?}", self)
    }
}

/// Represents inspect data that is tied to a specific object. This inspect data and the object of
/// type T should always be bundled together.
pub trait InspectData<T> {
    fn new(object: &T, inspect: inspect::Node) -> Self;
}

pub trait IsInspectable
where
    Self: Sized + Send + Sync + 'static,
{
    type I: InspectData<Self>;
}

/// A wrapper around a type T that bundles some inspect data alongside instances of the type.
#[derive(Debug)]
pub struct Inspectable<T: IsInspectable> {
    pub(crate) inner: T,
    pub(crate) inspect: T::I,
}

impl<T: IsInspectable> Inspectable<T> {
    /// Create a new instance of an `Inspectable` wrapper type containing the T instance that
    /// it wraps along with populated inspect data.
    pub fn new(object: T, inspect: inspect::Node) -> Inspectable<T> {
        Inspectable { inspect: T::I::new(&object, inspect), inner: object }
    }
}

/// `Inspectable`s can always safely be immutably dereferenced as the type T that they wrap
/// because the data will not be mutated through this reference.
impl<T: IsInspectable> std::ops::Deref for Inspectable<T> {
    type Target = T;
    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

/// A trait representing the inspect data for a type T that will never be mutated. This trait
/// allows for a simpler "fire and forget" representation of the inspect data associated with an
/// object. This is because inspect handles for the data will never need to be accessed after
/// creation.
pub trait ImmutableDataInspect<T> {
    fn new(data: &T, manager: Node) -> Self;
}

/// "Fire and forget" representation of some inspect data that does not allow access inspect
/// handles after they are created.
pub struct ImmutableDataInspectManager {
    pub(crate) _manager: Node,
}

impl<T, I: ImmutableDataInspect<T>> InspectData<T> for I {
    /// Create a new instance of some type `I` that represents the immutable inspect data for a type
    /// `T`. This is done by handing `I` a `Node` and calling into the
    /// monomorphized version of ImmutableDataInspect<T> for I.
    fn new(data: &T, inspect: inspect::Node) -> I {
        I::new(data, inspect)
    }
}

/// The values associated with a data transfer.
struct DataTransferStats {
    /// The time at which the data transfer was recorded.
    time: fasync::Time,
    /// The elapsed amount of time (nanos) the data transfer took place over.
    elapsed: std::num::NonZeroU64,
    /// The bytes transferred.
    bytes: usize,
}

impl DataTransferStats {
    /// Calculates and returns the throughput of the `bytes` received in the
    /// data transfer.
    fn calculate_throughput(&self) -> u64 {
        // NOTE: probably a better way to calculate the speed than using floats.
        let bytes_per_nano = self.bytes as f64 / self.elapsed.get() as f64;
        let bytes_per_second = zx::Duration::from_seconds(1).into_nanos() as f64 * bytes_per_nano;
        bytes_per_second as u64
    }
}

/// An inspect node that represents a stream of data, recording the total amount of data
/// transferred and an instantaneous rate.
#[derive(Inspect, Default)]
pub struct DataStreamInspect {
    /// The total number of bytes transferred in this stream
    total_bytes: inspect::UintProperty,
    /// Bytes per second, based on the most recent update.
    bytes_per_second_current: inspect::UintProperty,
    /// Time that this stream started.
    /// Managed manually.
    #[inspect(skip)]
    start_time_prop: Option<fuchsia_inspect_contrib::nodes::TimeProperty>,
    /// Time that we were last started.  Used to calculate seconds running.
    #[inspect(skip)]
    started: Option<fasync::Time>,
    /// Seconds that this stream has been active, measured from the start time to the last
    /// recorded transfer.
    streaming_secs: inspect::UintProperty,
    /// Used to calculate instantaneous bytes_per_second.
    #[inspect(skip)]
    last_update: Option<DataTransferStats>,
    inspect_node: inspect::Node,
}

impl DataStreamInspect {
    pub fn start(&mut self) {
        let now = fasync::Time::now();
        if let Some(prop) = &self.start_time_prop {
            prop.set_at(now.into());
        } else {
            self.start_time_prop = Some(self.inspect_node.create_time_at("start_time", now.into()));
        }
        self.started = Some(now);
        self.last_update = Some(DataTransferStats {
            time: now,
            elapsed: std::num::NonZeroU64::new(1).unwrap(), // Default smallest interval of 1 nano
            bytes: 0,
        });
    }

    /// Record that `bytes` have been transferred as of `at`.
    /// This is recorded since the `last_update` time or since `start` if it
    /// has never been called.
    /// Does nothing if this stream has never been started or if the provided `at` time
    /// is in the past relative to the `last_update` time.
    pub fn record_transferred(&mut self, bytes: usize, at: fasync::Time) {
        let (elapsed, current_bytes) = match self.last_update {
            Some(DataTransferStats { time: last, .. }) if at > last => {
                // A new data transfer - calculate the new elapsed time interval.
                let elapsed = (at - last).into_nanos() as u64;
                (std::num::NonZeroU64::new(elapsed).unwrap(), bytes)
            }
            Some(DataTransferStats { time: last, elapsed, bytes: last_bytes }) if at == last => {
                // An addition to the previous data transfer - use the previous elapsed time
                // interval and an updated byte count.
                (elapsed, last_bytes + bytes)
            }
            _ => return, // Otherwise, we haven't started or received an invalid `at` time.
        };

        let transfer = DataTransferStats { time: at, elapsed, bytes: current_bytes };
        self.total_bytes.add(bytes as u64);
        self.bytes_per_second_current.set(transfer.calculate_throughput());
        self.last_update = Some(transfer);
        if let Some(started) = &self.started {
            let secs: u64 = (at - *started).into_seconds().try_into().unwrap_or(0);
            self.streaming_secs.set(secs);
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use diagnostics_assertions::assert_data_tree;
    use fuchsia_async::DurationExt;
    use fuchsia_inspect_derive::WithInspect;
    use fuchsia_zircon::DurationNum;

    #[test]
    fn bool_to_property() {
        let b = false.to_property();
        assert_eq!(b, FALSE_VALUE);
        let b = true.to_property();
        assert_eq!(b, TRUE_VALUE);
    }

    #[test]
    fn optional_bool_to_property() {
        let b: u64 = None::<bool>.to_property();
        assert_eq!(b, FALSE_VALUE);
        let b = Some(false).to_property();
        assert_eq!(b, FALSE_VALUE);
        let b = Some(true).to_property();
        assert_eq!(b, TRUE_VALUE);
    }

    #[test]
    fn string_vec_to_property() {
        let s = Vec::<String>::new().to_property();
        assert_eq!(s, "");
        let s = vec!["foo".to_string()].to_property();
        assert_eq!(s, "foo");
        let s = vec!["foo".to_string(), "bar".to_string(), "baz".to_string()].to_property();
        assert_eq!(s, "foo, bar, baz");
    }

    #[test]
    fn optional_string_vec_to_property() {
        let s = Some(vec!["foo".to_string(), "bar".to_string(), "baz".to_string()]).to_property();
        assert_eq!(s, "foo, bar, baz");
    }

    #[test]
    fn debug_string() {
        #[derive(Debug)]
        struct Foo {
            // TODO(https://fxbug.dev/42165549)
            #[allow(unused)]
            bar: u8,
            // TODO(https://fxbug.dev/42165549)
            #[allow(unused)]
            baz: &'static str,
        }
        let foo = Foo { bar: 1, baz: "baz value" };
        assert_eq!(format!("{:?}", foo), foo.debug());
    }

    /// Sets up an inspect test with an executor at timestamp `curr_time`.
    fn setup_inspect(
        curr_time: i64,
    ) -> (fasync::TestExecutor, fuchsia_inspect::Inspector, DataStreamInspect) {
        let exec = fasync::TestExecutor::new_with_fake_time();
        exec.set_fake_time(fasync::Time::from_nanos(curr_time));
        let inspector = fuchsia_inspect::Inspector::default();
        let d = DataStreamInspect::default()
            .with_inspect(inspector.root(), "data_stream")
            .expect("attach to tree");

        (exec, inspector, d)
    }

    #[test]
    fn data_stream_inspect_data_transfer_before_start_has_no_effect() {
        let (_exec, inspector, mut d) = setup_inspect(5_123400000);

        // Default inspect tree.
        assert_data_tree!(inspector, root: {
            data_stream: {
                total_bytes: 0 as u64,
                streaming_secs: 0 as u64,
                bytes_per_second_current: 0 as u64,
            }
        });

        // Recording a data transfer before start() has no effect.
        d.record_transferred(1, fasync::Time::now());
        assert_data_tree!(inspector, root: {
            data_stream: {
                total_bytes: 0 as u64,
                streaming_secs: 0 as u64,
                bytes_per_second_current: 0 as u64,
            }
        });
    }

    #[test]
    fn data_stream_inspect_record_past_time_has_no_effect() {
        let curr_time = 5_678900000;
        let (_exec, inspector, mut d) = setup_inspect(curr_time);

        d.start();
        assert_data_tree!(inspector, root: {
            data_stream: {
                start_time: 5_678900000i64,
                total_bytes: 0 as u64,
                streaming_secs: 0 as u64,
                bytes_per_second_current: 0 as u64,
            }
        });

        // Recording a data transfer with an older time has no effect.
        let time_from_past = curr_time - 10;
        d.record_transferred(1, fasync::Time::from_nanos(time_from_past));
        assert_data_tree!(inspector, root: {
            data_stream: {
                start_time: 5_678900000i64,
                total_bytes: 0 as u64,
                streaming_secs: 0 as u64,
                bytes_per_second_current: 0 as u64,
            }
        });
    }

    #[test]
    fn data_stream_inspect_data_transfer_immediately_after_start_is_ok() {
        let curr_time = 5_678900000;
        let (_exec, inspector, mut d) = setup_inspect(curr_time);

        d.start();
        assert_data_tree!(inspector, root: {
            data_stream: {
                start_time: 5_678900000i64,
                total_bytes: 0 as u64,
                streaming_secs: 0 as u64,
                bytes_per_second_current: 0 as u64,
            }
        });

        // Although unlikely, recording a data transfer at the same instantaneous moment as starting
        // is OK.
        d.record_transferred(5, fasync::Time::from_nanos(curr_time));
        assert_data_tree!(inspector, root: {
            data_stream: {
                start_time: 5_678900000i64,
                total_bytes: 5 as u64,
                streaming_secs: 0 as u64,
                bytes_per_second_current: 5_000_000_000 as u64,
            }
        });
    }

    #[test]
    fn data_stream_inspect_records_correct_throughput() {
        let (exec, inspector, mut d) = setup_inspect(5_678900000);

        d.start();

        assert_data_tree!(inspector, root: {
            data_stream: {
                start_time: 5_678900000i64,
                total_bytes: 0 as u64,
                streaming_secs: 0 as u64,
                bytes_per_second_current: 0 as u64,
            }
        });

        // A half second passes.
        exec.set_fake_time(500.millis().after_now());

        // If we transferred 500 bytes then, we should have 1000 bytes per second.
        d.record_transferred(500, fasync::Time::now());
        assert_data_tree!(inspector, root: {
            data_stream: {
                start_time: 5_678900000i64,
                total_bytes: 500 as u64,
                streaming_secs: 0 as u64,
                bytes_per_second_current: 1000 as u64,
            }
        });

        // In 5 seconds, we transfer 500 more bytes which is much slower.
        exec.set_fake_time(5.seconds().after_now());
        d.record_transferred(500, fasync::Time::now());
        assert_data_tree!(inspector, root: {
            data_stream: {
                start_time: 5_678900000i64,
                total_bytes: 1000 as u64,
                streaming_secs: 5 as u64,
                bytes_per_second_current: 100 as u64,
            }
        });

        // Receiving another update at the same time is OK.
        d.record_transferred(900, fasync::Time::now());
        assert_data_tree!(inspector, root: {
            data_stream: {
                start_time: 5_678900000i64,
                total_bytes: 1900 as u64,
                streaming_secs: 5 as u64,
                bytes_per_second_current: 280 as u64,
            }
        });
    }

    #[test]
    fn test_calculate_throughput() {
        let time = fasync::Time::from_nanos(1_000_000_000);
        // No throughput.
        let bytes = 0;
        let elapsed = std::num::NonZeroU64::new(1_000_000).unwrap();
        let transfer1 = DataTransferStats { time, elapsed, bytes };
        assert_eq!(transfer1.calculate_throughput(), 0);

        // Fractional throughput in bytes per nano.
        let bytes = 1;
        let elapsed = std::num::NonZeroU64::new(1_000_000).unwrap();
        let transfer2 = DataTransferStats { time, elapsed, bytes };
        assert_eq!(transfer2.calculate_throughput(), 1000);

        // Fractional throughput in bytes per nano.
        let bytes = 5;
        let elapsed = std::num::NonZeroU64::new(9_502_241).unwrap();
        let transfer3 = DataTransferStats { time, elapsed, bytes };
        let expected = 526; // Calculated using calculator.
        assert_eq!(transfer3.calculate_throughput(), expected);

        // Very small fractional throughput in bytes per nano. Should truncate to 0.
        let bytes = 19;
        let elapsed = std::num::NonZeroU64::new(5_213_999_642_004).unwrap();
        let transfer4 = DataTransferStats { time, elapsed, bytes };
        assert_eq!(transfer4.calculate_throughput(), 0);

        // Throughput of 1 in bytes per nano.
        let bytes = 100;
        let elapsed = std::num::NonZeroU64::new(100).unwrap();
        let transfer5 = DataTransferStats { time, elapsed, bytes };
        assert_eq!(transfer5.calculate_throughput(), 1_000_000_000);

        // Large throughput in bytes per nano.
        let bytes = 100;
        let elapsed = std::num::NonZeroU64::new(1).unwrap();
        let transfer6 = DataTransferStats { time, elapsed, bytes };
        assert_eq!(transfer6.calculate_throughput(), 100_000_000_000);

        // Large fractional throughput in bytes per nano.
        let bytes = 987_432_002_999;
        let elapsed = std::num::NonZeroU64::new(453).unwrap();
        let transfer7 = DataTransferStats { time, elapsed, bytes };
        let expected = 2_179_761_596_024_282_368; // Calculated using calculator.
        assert_eq!(transfer7.calculate_throughput(), expected);
    }
}