1use diagnostics_message::MonikerWithUrl;
6use fidl::endpoints::{RequestStream, ServerEnd};
7use fidl_fuchsia_diagnostics_types::{Interest, Severity};
8use fidl_fuchsia_logger::{
9 LogSinkMarker, LogSinkOnInitRequest, LogSinkRequest, LogSinkWaitForInterestChangeResponder,
10};
11use fuchsia_async::{self as fasync, EHandle};
12use futures::StreamExt;
13use futures::future::{AbortHandle, Abortable};
14use ring_buffer::RingBuffer;
15use std::sync::{Arc, Condvar, Mutex};
16
17pub struct FakeLogSink {
19 ring_buffer: ring_buffer::Reader,
20 min_severity: Arc<Mutex<MinSeverity>>,
21 ehandle: EHandle,
22 _abort_handle: DropAbortHandle,
25}
26
27struct DropAbortHandle(AbortHandle);
28
29impl Drop for DropAbortHandle {
30 fn drop(&mut self) {
31 self.0.abort();
32 }
33}
34
35struct MinSeverity {
36 severity: Severity,
37 updates: MinSeverityUpdates,
38}
39
40#[derive(Default)]
41enum MinSeverityUpdates {
42 #[default]
43 None,
44 Pending,
45 Listeners(Vec<LogSinkWaitForInterestChangeResponder>),
46}
47
48impl Default for FakeLogSink {
49 fn default() -> Self {
50 FakeLogSink::new()
51 }
52}
53
54impl FakeLogSink {
55 pub fn new() -> Self {
57 Self::new_impl(false)
58 }
59
60 fn new_impl(sync: bool) -> Self {
61 let (abort_handle, abort_registration) = AbortHandle::new_pair();
62 let (ehandle, ring_buffer) = if sync {
63 let ehandle_and_ring_buffer = Arc::new((Mutex::new(None), Condvar::new()));
64 let eh_and_rb = ehandle_and_ring_buffer.clone();
65 std::thread::spawn(|| {
66 let _ = fasync::LocalExecutor::new().run_singlethreaded(Abortable::new(
67 async move {
68 let reader = RingBuffer::create(ring_buffer::MAX_MESSAGE_SIZE);
69 *eh_and_rb.0.lock().unwrap() = Some((EHandle::local(), reader));
70 eh_and_rb.1.notify_all();
71 let () = std::future::pending().await;
72 },
73 abort_registration,
74 ));
75 });
76 let (ehandle, reader) = ehandle_and_ring_buffer
77 .1
78 .wait_while(ehandle_and_ring_buffer.0.lock().unwrap(), |rb| rb.is_none())
79 .unwrap()
80 .take()
81 .unwrap();
82 (ehandle, reader)
83 } else {
84 let reader = RingBuffer::create(ring_buffer::MAX_MESSAGE_SIZE);
85 fasync::Task::spawn(async {
86 let _ = Abortable::new(std::future::pending::<()>(), abort_registration).await;
87 })
88 .detach();
89 (EHandle::local(), reader)
90 };
91
92 Self {
93 ring_buffer,
94 min_severity: Arc::new(Mutex::new(MinSeverity {
95 severity: Severity::Info,
96 updates: MinSeverityUpdates::default(),
97 })),
98 ehandle,
99 _abort_handle: DropAbortHandle(abort_handle),
100 }
101 }
102
103 pub async fn read_message(&mut self) -> String {
105 let (_tag, bytes) = self.ring_buffer.read_message().await.unwrap();
106 diagnostics_message::from_structured(
107 MonikerWithUrl { url: "".into(), moniker: "fake-log-sink".try_into().unwrap() },
108 &bytes,
109 )
110 .unwrap()
111 .msg()
112 .unwrap()
113 .into()
114 }
115
116 pub fn serve(&self, server_end: ServerEnd<LogSinkMarker>) {
118 let (iob, _) = self.ring_buffer.new_iob_writer(1).unwrap();
119 let min_severity = self.min_severity.clone();
120 self.ehandle.spawn_detached(async move {
121 let mut requests = server_end.into_stream();
122 {
123 let mut min_severity = min_severity.lock().unwrap();
124 requests
125 .control_handle()
126 .send_on_init(LogSinkOnInitRequest {
127 buffer: Some(iob),
128 interest: Some(Interest {
129 min_severity: Some(min_severity.severity),
130 ..Default::default()
131 }),
132 ..Default::default()
133 })
134 .unwrap();
135 min_severity.updates = MinSeverityUpdates::None;
136 }
137
138 while let Some(Ok(request)) = requests.next().await {
139 match request {
140 LogSinkRequest::WaitForInterestChange { responder } => {
141 let mut min_severity = min_severity.lock().unwrap();
142 match &mut min_severity.updates {
143 MinSeverityUpdates::None => {
144 min_severity.updates =
145 MinSeverityUpdates::Listeners(vec![responder]);
146 }
147 MinSeverityUpdates::Pending => {
148 let _ = responder.send(Ok(&Interest {
149 min_severity: Some(min_severity.severity),
150 ..Default::default()
151 }));
152 min_severity.updates = MinSeverityUpdates::None;
153 }
154 MinSeverityUpdates::Listeners(l) => l.push(responder),
155 }
156 }
157 _ => unreachable!(),
158 }
159 }
160 });
161 }
162
163 pub fn set_min_severity(&self, severity: Severity) {
165 let mut min_severity = self.min_severity.lock().unwrap();
166 if severity == min_severity.severity {
167 return;
168 }
169 min_severity.severity = severity;
170 match &mut min_severity.updates {
171 MinSeverityUpdates::Listeners(l) => {
172 for responder in l.drain(..) {
173 let _ = responder
174 .send(Ok(&Interest { min_severity: Some(severity), ..Default::default() }));
175 }
176 min_severity.updates = MinSeverityUpdates::None;
177 }
178 _ => min_severity.updates = MinSeverityUpdates::Pending,
179 }
180 }
181}
182
183#[cfg(feature = "ffi")]
184pub mod ffi {
185 use log_decoder_c_bindings as _;
189
190 use super::FakeLogSink;
191 use fidl::endpoints::ServerEnd;
192 use fidl_fuchsia_diagnostics_types::Severity;
193 use fuchsia_async::TimeoutExt;
194 use futures::executor::block_on;
195
196 #[unsafe(no_mangle)]
198 pub extern "C" fn fake_log_sink_new() -> *mut FakeLogSink {
199 Box::into_raw(Box::new(super::FakeLogSink::new_impl(true)))
200 }
201
202 #[unsafe(no_mangle)]
208 pub unsafe extern "C" fn fake_log_sink_delete(fake: *mut FakeLogSink) {
209 drop(unsafe { Box::from_raw(fake) });
210 }
211
212 #[unsafe(no_mangle)]
218 pub unsafe extern "C" fn fake_log_sink_serve(fake: *mut FakeLogSink, handle: u32) {
219 unsafe { &*fake }.serve(ServerEnd::new(unsafe { zx::Handle::from_raw(handle) }.into()));
220 }
221
222 #[unsafe(no_mangle)]
229 pub unsafe extern "C" fn fake_log_sink_read_record(
230 fake: *mut FakeLogSink,
231 dest: *mut u8,
232 capacity: usize,
233 ) -> usize {
234 let fake = unsafe { &mut *fake };
235 let buf = block_on(fake.ring_buffer.read_message()).unwrap().1;
236 assert!(buf.len() <= capacity, "{} {capacity}", buf.len());
237 unsafe {
238 std::slice::from_raw_parts_mut(dest, buf.len()).copy_from_slice(&buf);
239 }
240 buf.len()
241 }
242
243 #[unsafe(no_mangle)]
253 pub unsafe extern "C" fn fake_log_sink_set_min_severity(fake: *mut FakeLogSink, severity: u8) {
254 unsafe { &*fake }.set_min_severity(Severity::from_primitive(severity).unwrap());
255 }
256
257 #[unsafe(no_mangle)]
263 pub unsafe extern "C" fn fake_log_sink_wait_for_record(
264 fake: *mut FakeLogSink,
265 deadline_nanos: i64,
266 ) -> usize {
267 unsafe fn erase_lifetime(sink: &mut FakeLogSink) -> &'static mut FakeLogSink {
268 unsafe { std::mem::transmute(sink) }
269 }
270
271 let fake = unsafe { &mut *fake };
272
273 let tail = fake.ring_buffer.tail();
274 let scope = fake.ehandle.global_scope().clone();
275
276 {
277 let fake = unsafe { erase_lifetime(fake) };
278 block_on(scope.compute(async move {
279 fake.ring_buffer
280 .wait(tail)
281 .on_timeout(zx::MonotonicInstant::from_nanos(deadline_nanos), || 0)
282 .await
283 }));
284 }
285
286 let head = fake.ring_buffer.head();
287 if head == tail {
288 0
289 } else {
290 unsafe { fake.ring_buffer.first_message_in(tail..head) }.unwrap().1.len()
291 }
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::FakeLogSink;
298 use diagnostics_log_encoding::encode::{
299 Encoder, EncoderOpts, LogEvent, MutableBuffer, WriteEventParams,
300 };
301 use fidl::endpoints::create_proxy;
302 use fidl_fuchsia_logger::{LogSinkEvent, LogSinkOnInitRequest, MAX_DATAGRAM_LEN_BYTES};
303 use futures::StreamExt;
304 use std::io::Cursor;
305
306 #[fuchsia::test(logging = false)]
307 async fn log() {
308 let (proxy, server) = create_proxy();
309 let mut fake_sink = FakeLogSink::new();
310 fake_sink.serve(server);
311
312 let Some(Ok(LogSinkEvent::OnInit {
315 payload: LogSinkOnInitRequest { buffer: Some(iob), .. },
316 })) = proxy.take_event_stream().next().await
317 else {
318 panic!("Expected OnInit")
319 };
320
321 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
322 let mut encoder = Encoder::new(Cursor::new(&mut buf[..]), EncoderOpts::default());
323 let tags: &[&str] = &[];
324 const MSG: &str = "The quick brown fox jumps over the lazy dog";
325 encoder
326 .write_event(WriteEventParams {
327 event: LogEvent::new(&log::Record::builder().args(format_args!("{MSG}")).build()),
328 tags,
329 metatags: std::iter::empty(),
330 pid: zx::Koid::from_raw(1),
331 tid: zx::Koid::from_raw(2),
332 dropped: 0,
333 })
334 .unwrap();
335 let end = encoder.inner().cursor();
336 iob.write(Default::default(), 0, &encoder.inner().get_ref()[..end]).unwrap();
337
338 assert_eq!(fake_sink.read_message().await, MSG);
339 }
340}