fake_log_sink/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use diagnostics_message::MonikerWithUrl;
6use fidl::endpoints::{RequestStream, ServerEnd};
7use fidl_fuchsia_diagnostics_types::{Interest, Severity};
8use fidl_fuchsia_logger::{
9    LogSinkMarker, LogSinkOnInitRequest, LogSinkRequest, LogSinkWaitForInterestChangeResponder,
10};
11use fuchsia_async::{self as fasync, EHandle};
12use fuchsia_sync::{Condvar, Mutex};
13use futures::StreamExt;
14use futures::future::{AbortHandle, Abortable};
15use ring_buffer::RingBuffer;
16use std::sync::Arc;
17
18/// FakeLogSink serves LogSink connections and forward any messages logged to it.
19pub struct FakeLogSink {
20    ring_buffer: ring_buffer::Reader,
21    min_severity: Arc<Mutex<MinSeverity>>,
22    ehandle: EHandle,
23    // This must be dropped after the ring buffer to avoid the executor complaining about
24    // receivers outliving their executor.
25    _abort_handle: DropAbortHandle,
26}
27
28struct DropAbortHandle(AbortHandle);
29
30impl Drop for DropAbortHandle {
31    fn drop(&mut self) {
32        self.0.abort();
33    }
34}
35
36struct MinSeverity {
37    severity: Severity,
38    updates: MinSeverityUpdates,
39}
40
41#[derive(Default)]
42enum MinSeverityUpdates {
43    #[default]
44    None,
45    Pending,
46    Listeners(Vec<LogSinkWaitForInterestChangeResponder>),
47}
48
49impl Default for FakeLogSink {
50    fn default() -> Self {
51        FakeLogSink::new()
52    }
53}
54
55impl FakeLogSink {
56    /// Returns a new FakeLogSink and receiver that will have messages delivered to it.
57    pub fn new() -> Self {
58        Self::new_impl(false)
59    }
60
61    fn new_impl(sync: bool) -> Self {
62        let (abort_handle, abort_registration) = AbortHandle::new_pair();
63        let (ehandle, ring_buffer) = if sync {
64            let ehandle_and_ring_buffer = Arc::new((Mutex::new(None), Condvar::new()));
65            let eh_and_rb = ehandle_and_ring_buffer.clone();
66            std::thread::spawn(|| {
67                let _ = fasync::LocalExecutor::default().run_singlethreaded(Abortable::new(
68                    async move {
69                        let reader = RingBuffer::create(ring_buffer::MAX_MESSAGE_SIZE);
70                        *eh_and_rb.0.lock() = Some((EHandle::local(), reader));
71                        eh_and_rb.1.notify_all();
72                        let () = std::future::pending().await;
73                    },
74                    abort_registration,
75                ));
76            });
77            let mut ehandle_buffer_guard = ehandle_and_ring_buffer.0.lock();
78            ehandle_and_ring_buffer.1.wait_while(&mut ehandle_buffer_guard, |rb| rb.is_none());
79            ehandle_buffer_guard.take().unwrap()
80        } else {
81            let reader = RingBuffer::create(ring_buffer::MAX_MESSAGE_SIZE);
82            fasync::Task::spawn(async {
83                let _ = Abortable::new(std::future::pending::<()>(), abort_registration).await;
84            })
85            .detach();
86            (EHandle::local(), reader)
87        };
88
89        Self {
90            ring_buffer,
91            min_severity: Arc::new(Mutex::new(MinSeverity {
92                severity: Severity::Info,
93                updates: MinSeverityUpdates::default(),
94            })),
95            ehandle,
96            _abort_handle: DropAbortHandle(abort_handle),
97        }
98    }
99
100    /// Returns a message sent to the sink.
101    pub async fn read_message(&mut self) -> String {
102        let (_tag, bytes) = self.ring_buffer.read_message().await.unwrap();
103        diagnostics_message::from_structured(
104            MonikerWithUrl { url: "".into(), moniker: "fake-log-sink".try_into().unwrap() },
105            &bytes,
106        )
107        .unwrap()
108        .msg()
109        .unwrap()
110        .into()
111    }
112
113    /// Handles the server end of the LogSink connection.
114    pub fn serve(&self, server_end: ServerEnd<LogSinkMarker>) {
115        let (iob, _) = self.ring_buffer.new_iob_writer(1).unwrap();
116        let min_severity = self.min_severity.clone();
117        self.ehandle.spawn_detached(async move {
118            let mut requests = server_end.into_stream();
119            {
120                let mut min_severity = min_severity.lock();
121                requests
122                    .control_handle()
123                    .send_on_init(LogSinkOnInitRequest {
124                        buffer: Some(iob),
125                        interest: Some(Interest {
126                            min_severity: Some(min_severity.severity),
127                            ..Default::default()
128                        }),
129                        ..Default::default()
130                    })
131                    .unwrap();
132                min_severity.updates = MinSeverityUpdates::None;
133            }
134
135            while let Some(Ok(request)) = requests.next().await {
136                match request {
137                    LogSinkRequest::WaitForInterestChange { responder } => {
138                        let mut min_severity = min_severity.lock();
139                        match &mut min_severity.updates {
140                            MinSeverityUpdates::None => {
141                                min_severity.updates =
142                                    MinSeverityUpdates::Listeners(vec![responder]);
143                            }
144                            MinSeverityUpdates::Pending => {
145                                let _ = responder.send(Ok(&Interest {
146                                    min_severity: Some(min_severity.severity),
147                                    ..Default::default()
148                                }));
149                                min_severity.updates = MinSeverityUpdates::None;
150                            }
151                            MinSeverityUpdates::Listeners(l) => l.push(responder),
152                        }
153                    }
154                    _ => unreachable!(),
155                }
156            }
157        });
158    }
159
160    /// Sets the minimum severity and notifies all listeners.
161    pub fn set_min_severity(&self, severity: Severity) {
162        let mut min_severity = self.min_severity.lock();
163        if severity == min_severity.severity {
164            return;
165        }
166        min_severity.severity = severity;
167        match &mut min_severity.updates {
168            MinSeverityUpdates::Listeners(l) => {
169                for responder in l.drain(..) {
170                    let _ = responder
171                        .send(Ok(&Interest { min_severity: Some(severity), ..Default::default() }));
172                }
173                min_severity.updates = MinSeverityUpdates::None;
174            }
175            _ => min_severity.updates = MinSeverityUpdates::Pending,
176        }
177    }
178}
179
180#[cfg(feature = "ffi")]
181pub mod ffi {
182    // NOTE: It isn't currently possible to link to more than one rustc_staticlib; it results in
183    // duplicate definition linker errors on LTO builds. To workaround this, we import
184    // log_decoder_c_bindings here so that it gets pulled in as part of this static library.
185    use log_decoder_c_bindings as _;
186
187    use super::FakeLogSink;
188    use fidl::endpoints::ServerEnd;
189    use fidl_fuchsia_diagnostics_types::Severity;
190    use fuchsia_async::TimeoutExt;
191    use futures::executor::block_on;
192
193    /// Creates a new fake log sink.
194    #[unsafe(no_mangle)]
195    pub extern "C" fn fake_log_sink_new() -> *mut FakeLogSink {
196        Box::into_raw(Box::new(super::FakeLogSink::new_impl(true)))
197    }
198
199    /// Deletes a log sink.
200    ///
201    /// # Safety
202    ///
203    /// `fake` must be from `fake_log_sink_new()`.
204    #[unsafe(no_mangle)]
205    pub unsafe extern "C" fn fake_log_sink_delete(fake: *mut FakeLogSink) {
206        drop(unsafe { Box::from_raw(fake) });
207    }
208
209    /// Serves a new connection
210    ///
211    /// # Safety
212    ///
213    /// `fake` must be from `fake_log_sink_new()` and `handle` must be valid.
214    #[unsafe(no_mangle)]
215    pub unsafe extern "C" fn fake_log_sink_serve(fake: *mut FakeLogSink, handle: u32) {
216        unsafe { &*fake }
217            .serve(ServerEnd::new(unsafe { zx::NullableHandle::from_raw(handle) }.into()));
218    }
219
220    /// Reads a new record.
221    ///
222    ///
223    /// # Safety
224    ///
225    /// `fake` must be from `fake_log_sink_new()` and `dest` and `capacity` must be valid.
226    #[unsafe(no_mangle)]
227    pub unsafe extern "C" fn fake_log_sink_read_record(
228        fake: *mut FakeLogSink,
229        dest: *mut u8,
230        capacity: usize,
231    ) -> usize {
232        let fake = unsafe { &mut *fake };
233        let buf = block_on(fake.ring_buffer.read_message()).unwrap().1;
234        assert!(buf.len() <= capacity, "{} {capacity}", buf.len());
235        unsafe {
236            std::slice::from_raw_parts_mut(dest, buf.len()).copy_from_slice(&buf);
237        }
238        buf.len()
239    }
240
241    /// Sets the minimum severity and notifies listeners.
242    ///
243    /// # Panics
244    ///
245    /// This will panic if `severity` is invalid.
246    ///
247    /// # Safety
248    ///
249    /// `fake` must be from `fake_log_sink_new()`.
250    #[unsafe(no_mangle)]
251    pub unsafe extern "C" fn fake_log_sink_set_min_severity(fake: *mut FakeLogSink, severity: u8) {
252        unsafe { &*fake }.set_min_severity(Severity::from_primitive(severity).unwrap());
253    }
254
255    /// Waits for a record to be ready and returns its size, or zero if timed out.
256    ///
257    /// # Safety
258    ///
259    /// `fake` must be from `fake_log_sink_new()`.
260    #[unsafe(no_mangle)]
261    pub unsafe extern "C" fn fake_log_sink_wait_for_record(
262        fake: *mut FakeLogSink,
263        deadline_nanos: i64,
264    ) -> usize {
265        unsafe fn erase_lifetime(sink: &mut FakeLogSink) -> &'static mut FakeLogSink {
266            unsafe { std::mem::transmute(sink) }
267        }
268
269        let fake = unsafe { &mut *fake };
270
271        let tail = fake.ring_buffer.tail();
272        let scope = fake.ehandle.global_scope().clone();
273
274        {
275            let fake = unsafe { erase_lifetime(fake) };
276            block_on(scope.compute(async move {
277                fake.ring_buffer
278                    .wait(tail)
279                    .on_timeout(zx::MonotonicInstant::from_nanos(deadline_nanos), || 0)
280                    .await
281            }));
282        }
283
284        let head = fake.ring_buffer.head();
285        if head == tail {
286            0
287        } else {
288            unsafe { fake.ring_buffer.first_message_in(tail..head) }.unwrap().1.len()
289        }
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::FakeLogSink;
296    use diagnostics_log_encoding::encode::{
297        Encoder, EncoderOpts, LogEvent, MutableBuffer, WriteEventParams,
298    };
299    use fidl::endpoints::create_proxy;
300    use fidl_fuchsia_logger::{LogSinkEvent, LogSinkOnInitRequest, MAX_DATAGRAM_LEN_BYTES};
301    use futures::StreamExt;
302    use std::io::Cursor;
303
304    #[fuchsia::test(logging = false)]
305    async fn log() {
306        let (proxy, server) = create_proxy();
307        let mut fake_sink = FakeLogSink::new();
308        fake_sink.serve(server);
309
310        // NOTE: This can be changed to use the diagnostics client library when support for the
311        // IOBuffer has been added.
312        let Some(Ok(LogSinkEvent::OnInit {
313            payload: LogSinkOnInitRequest { buffer: Some(iob), .. },
314        })) = proxy.take_event_stream().next().await
315        else {
316            panic!("Expected OnInit")
317        };
318
319        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
320        let mut encoder = Encoder::new(Cursor::new(&mut buf[..]), EncoderOpts::default());
321        let tags: &[&str] = &[];
322        const MSG: &str = "The quick brown fox jumps over the lazy dog";
323        encoder
324            .write_event(WriteEventParams {
325                event: LogEvent::new(&log::Record::builder().args(format_args!("{MSG}")).build()),
326                tags,
327                metatags: std::iter::empty(),
328                pid: zx::Koid::from_raw(1),
329                tid: zx::Koid::from_raw(2),
330                dropped: 0,
331            })
332            .unwrap();
333        let end = encoder.inner().cursor();
334        iob.write(Default::default(), 0, &encoder.inner().get_ref()[..end]).unwrap();
335
336        assert_eq!(fake_sink.read_message().await, MSG);
337    }
338}