fake_log_sink/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use diagnostics_message::MonikerWithUrl;
6use fidl::endpoints::{RequestStream, ServerEnd};
7use fidl_fuchsia_diagnostics_types::{Interest, Severity};
8use fidl_fuchsia_logger::{
9    LogSinkMarker, LogSinkOnInitRequest, LogSinkRequest, LogSinkWaitForInterestChangeResponder,
10};
11use fuchsia_async::{self as fasync, EHandle};
12use futures::StreamExt;
13use futures::future::{AbortHandle, Abortable};
14use ring_buffer::RingBuffer;
15use std::sync::{Arc, Condvar, Mutex};
16
17/// FakeLogSink serves LogSink connections and forward any messages logged to it.
18pub struct FakeLogSink {
19    ring_buffer: ring_buffer::Reader,
20    min_severity: Arc<Mutex<MinSeverity>>,
21    ehandle: EHandle,
22    // This must be dropped after the ring buffer to avoid the executor complaining about
23    // receivers outliving their executor.
24    _abort_handle: DropAbortHandle,
25}
26
27struct DropAbortHandle(AbortHandle);
28
29impl Drop for DropAbortHandle {
30    fn drop(&mut self) {
31        self.0.abort();
32    }
33}
34
35struct MinSeverity {
36    severity: Severity,
37    updates: MinSeverityUpdates,
38}
39
40#[derive(Default)]
41enum MinSeverityUpdates {
42    #[default]
43    None,
44    Pending,
45    Listeners(Vec<LogSinkWaitForInterestChangeResponder>),
46}
47
48impl Default for FakeLogSink {
49    fn default() -> Self {
50        FakeLogSink::new()
51    }
52}
53
54impl FakeLogSink {
55    /// Returns a new FakeLogSink and receiver that will have messages delivered to it.
56    pub fn new() -> Self {
57        Self::new_impl(false)
58    }
59
60    fn new_impl(sync: bool) -> Self {
61        let (abort_handle, abort_registration) = AbortHandle::new_pair();
62        let (ehandle, ring_buffer) = if sync {
63            let ehandle_and_ring_buffer = Arc::new((Mutex::new(None), Condvar::new()));
64            let eh_and_rb = ehandle_and_ring_buffer.clone();
65            std::thread::spawn(|| {
66                let _ = fasync::LocalExecutor::new().run_singlethreaded(Abortable::new(
67                    async move {
68                        let reader = RingBuffer::create(ring_buffer::MAX_MESSAGE_SIZE);
69                        *eh_and_rb.0.lock().unwrap() = Some((EHandle::local(), reader));
70                        eh_and_rb.1.notify_all();
71                        let () = std::future::pending().await;
72                    },
73                    abort_registration,
74                ));
75            });
76            let (ehandle, reader) = ehandle_and_ring_buffer
77                .1
78                .wait_while(ehandle_and_ring_buffer.0.lock().unwrap(), |rb| rb.is_none())
79                .unwrap()
80                .take()
81                .unwrap();
82            (ehandle, reader)
83        } else {
84            let reader = RingBuffer::create(ring_buffer::MAX_MESSAGE_SIZE);
85            fasync::Task::spawn(async {
86                let _ = Abortable::new(std::future::pending::<()>(), abort_registration).await;
87            })
88            .detach();
89            (EHandle::local(), reader)
90        };
91
92        Self {
93            ring_buffer,
94            min_severity: Arc::new(Mutex::new(MinSeverity {
95                severity: Severity::Info,
96                updates: MinSeverityUpdates::default(),
97            })),
98            ehandle,
99            _abort_handle: DropAbortHandle(abort_handle),
100        }
101    }
102
103    /// Returns a message sent to the sink.
104    pub async fn read_message(&mut self) -> String {
105        let (_tag, bytes) = self.ring_buffer.read_message().await.unwrap();
106        diagnostics_message::from_structured(
107            MonikerWithUrl { url: "".into(), moniker: "fake-log-sink".try_into().unwrap() },
108            &bytes,
109        )
110        .unwrap()
111        .msg()
112        .unwrap()
113        .into()
114    }
115
116    /// Handles the server end of the LogSink connection.
117    pub fn serve(&self, server_end: ServerEnd<LogSinkMarker>) {
118        let (iob, _) = self.ring_buffer.new_iob_writer(1).unwrap();
119        let min_severity = self.min_severity.clone();
120        self.ehandle.spawn_detached(async move {
121            let mut requests = server_end.into_stream();
122            {
123                let mut min_severity = min_severity.lock().unwrap();
124                requests
125                    .control_handle()
126                    .send_on_init(LogSinkOnInitRequest {
127                        buffer: Some(iob),
128                        interest: Some(Interest {
129                            min_severity: Some(min_severity.severity),
130                            ..Default::default()
131                        }),
132                        ..Default::default()
133                    })
134                    .unwrap();
135                min_severity.updates = MinSeverityUpdates::None;
136            }
137
138            while let Some(Ok(request)) = requests.next().await {
139                match request {
140                    LogSinkRequest::WaitForInterestChange { responder } => {
141                        let mut min_severity = min_severity.lock().unwrap();
142                        match &mut min_severity.updates {
143                            MinSeverityUpdates::None => {
144                                min_severity.updates =
145                                    MinSeverityUpdates::Listeners(vec![responder]);
146                            }
147                            MinSeverityUpdates::Pending => {
148                                let _ = responder.send(Ok(&Interest {
149                                    min_severity: Some(min_severity.severity),
150                                    ..Default::default()
151                                }));
152                                min_severity.updates = MinSeverityUpdates::None;
153                            }
154                            MinSeverityUpdates::Listeners(l) => l.push(responder),
155                        }
156                    }
157                    _ => unreachable!(),
158                }
159            }
160        });
161    }
162
163    /// Sets the minimum severity and notifies all listeners.
164    pub fn set_min_severity(&self, severity: Severity) {
165        let mut min_severity = self.min_severity.lock().unwrap();
166        if severity == min_severity.severity {
167            return;
168        }
169        min_severity.severity = severity;
170        match &mut min_severity.updates {
171            MinSeverityUpdates::Listeners(l) => {
172                for responder in l.drain(..) {
173                    let _ = responder
174                        .send(Ok(&Interest { min_severity: Some(severity), ..Default::default() }));
175                }
176                min_severity.updates = MinSeverityUpdates::None;
177            }
178            _ => min_severity.updates = MinSeverityUpdates::Pending,
179        }
180    }
181}
182
183#[cfg(feature = "ffi")]
184pub mod ffi {
185    // NOTE: It isn't currently possible to link to more than one rustc_staticlib; it results in
186    // duplicate definition linker errors on LTO builds. To workaround this, we import
187    // log_decoder_c_bindings here so that it gets pulled in as part of this static library.
188    use log_decoder_c_bindings as _;
189
190    use super::FakeLogSink;
191    use fidl::endpoints::ServerEnd;
192    use fidl_fuchsia_diagnostics_types::Severity;
193    use fuchsia_async::TimeoutExt;
194    use futures::executor::block_on;
195
196    /// Creates a new fake log sink.
197    #[unsafe(no_mangle)]
198    pub extern "C" fn fake_log_sink_new() -> *mut FakeLogSink {
199        Box::into_raw(Box::new(super::FakeLogSink::new_impl(true)))
200    }
201
202    /// Deletes a log sink.
203    ///
204    /// # Safety
205    ///
206    /// `fake` must be from `fake_log_sink_new()`.
207    #[unsafe(no_mangle)]
208    pub unsafe extern "C" fn fake_log_sink_delete(fake: *mut FakeLogSink) {
209        drop(unsafe { Box::from_raw(fake) });
210    }
211
212    /// Serves a new connection
213    ///
214    /// # Safety
215    ///
216    /// `fake` must be from `fake_log_sink_new()` and `handle` must be valid.
217    #[unsafe(no_mangle)]
218    pub unsafe extern "C" fn fake_log_sink_serve(fake: *mut FakeLogSink, handle: u32) {
219        unsafe { &*fake }.serve(ServerEnd::new(unsafe { zx::Handle::from_raw(handle) }.into()));
220    }
221
222    /// Reads a new record.
223    ///
224    ///
225    /// # Safety
226    ///
227    /// `fake` must be from `fake_log_sink_new()` and `dest` and `capacity` must be valid.
228    #[unsafe(no_mangle)]
229    pub unsafe extern "C" fn fake_log_sink_read_record(
230        fake: *mut FakeLogSink,
231        dest: *mut u8,
232        capacity: usize,
233    ) -> usize {
234        let fake = unsafe { &mut *fake };
235        let buf = block_on(fake.ring_buffer.read_message()).unwrap().1;
236        assert!(buf.len() <= capacity, "{} {capacity}", buf.len());
237        unsafe {
238            std::slice::from_raw_parts_mut(dest, buf.len()).copy_from_slice(&buf);
239        }
240        buf.len()
241    }
242
243    /// Sets the minimum severity and notifies listeners.
244    ///
245    /// # Panics
246    ///
247    /// This will panic if `severity` is invalid.
248    ///
249    /// # Safety
250    ///
251    /// `fake` must be from `fake_log_sink_new()`.
252    #[unsafe(no_mangle)]
253    pub unsafe extern "C" fn fake_log_sink_set_min_severity(fake: *mut FakeLogSink, severity: u8) {
254        unsafe { &*fake }.set_min_severity(Severity::from_primitive(severity).unwrap());
255    }
256
257    /// Waits for a record to be ready and returns its size, or zero if timed out.
258    ///
259    /// # Safety
260    ///
261    /// `fake` must be from `fake_log_sink_new()`.
262    #[unsafe(no_mangle)]
263    pub unsafe extern "C" fn fake_log_sink_wait_for_record(
264        fake: *mut FakeLogSink,
265        deadline_nanos: i64,
266    ) -> usize {
267        unsafe fn erase_lifetime(sink: &mut FakeLogSink) -> &'static mut FakeLogSink {
268            unsafe { std::mem::transmute(sink) }
269        }
270
271        let fake = unsafe { &mut *fake };
272
273        let tail = fake.ring_buffer.tail();
274        let scope = fake.ehandle.global_scope().clone();
275
276        {
277            let fake = unsafe { erase_lifetime(fake) };
278            block_on(scope.compute(async move {
279                fake.ring_buffer
280                    .wait(tail)
281                    .on_timeout(zx::MonotonicInstant::from_nanos(deadline_nanos), || 0)
282                    .await
283            }));
284        }
285
286        let head = fake.ring_buffer.head();
287        if head == tail {
288            0
289        } else {
290            unsafe { fake.ring_buffer.first_message_in(tail..head) }.unwrap().1.len()
291        }
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::FakeLogSink;
298    use diagnostics_log_encoding::encode::{
299        Encoder, EncoderOpts, LogEvent, MutableBuffer, WriteEventParams,
300    };
301    use fidl::endpoints::create_proxy;
302    use fidl_fuchsia_logger::{LogSinkEvent, LogSinkOnInitRequest, MAX_DATAGRAM_LEN_BYTES};
303    use futures::StreamExt;
304    use std::io::Cursor;
305
306    #[fuchsia::test(logging = false)]
307    async fn log() {
308        let (proxy, server) = create_proxy();
309        let mut fake_sink = FakeLogSink::new();
310        fake_sink.serve(server);
311
312        // NOTE: This can be changed to use the diagnostics client library when support for the
313        // IOBuffer has been added.
314        let Some(Ok(LogSinkEvent::OnInit {
315            payload: LogSinkOnInitRequest { buffer: Some(iob), .. },
316        })) = proxy.take_event_stream().next().await
317        else {
318            panic!("Expected OnInit")
319        };
320
321        let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
322        let mut encoder = Encoder::new(Cursor::new(&mut buf[..]), EncoderOpts::default());
323        let tags: &[&str] = &[];
324        const MSG: &str = "The quick brown fox jumps over the lazy dog";
325        encoder
326            .write_event(WriteEventParams {
327                event: LogEvent::new(&log::Record::builder().args(format_args!("{MSG}")).build()),
328                tags,
329                metatags: std::iter::empty(),
330                pid: zx::Koid::from_raw(1),
331                tid: zx::Koid::from_raw(2),
332                dropped: 0,
333            })
334            .unwrap();
335        let end = encoder.inner().cursor();
336        iob.write(Default::default(), 0, &encoder.inner().get_ref()[..end]).unwrap();
337
338        assert_eq!(fake_sink.read_message().await, MSG);
339    }
340}