1use diagnostics_message::MonikerWithUrl;
6use fidl::endpoints::{RequestStream, ServerEnd};
7use fidl_fuchsia_diagnostics_types::{Interest, Severity};
8use fidl_fuchsia_logger::{
9 LogSinkMarker, LogSinkOnInitRequest, LogSinkRequest, LogSinkWaitForInterestChangeResponder,
10};
11use fuchsia_async::{self as fasync, EHandle};
12use fuchsia_sync::{Condvar, Mutex};
13use futures::StreamExt;
14use futures::future::{AbortHandle, Abortable};
15use ring_buffer::RingBuffer;
16use std::sync::Arc;
17
18pub struct FakeLogSink {
20 ring_buffer: ring_buffer::Reader,
21 min_severity: Arc<Mutex<MinSeverity>>,
22 ehandle: EHandle,
23 _abort_handle: DropAbortHandle,
26}
27
28struct DropAbortHandle(AbortHandle);
29
30impl Drop for DropAbortHandle {
31 fn drop(&mut self) {
32 self.0.abort();
33 }
34}
35
36struct MinSeverity {
37 severity: Severity,
38 updates: MinSeverityUpdates,
39}
40
41#[derive(Default)]
42enum MinSeverityUpdates {
43 #[default]
44 None,
45 Pending,
46 Listeners(Vec<LogSinkWaitForInterestChangeResponder>),
47}
48
49impl Default for FakeLogSink {
50 fn default() -> Self {
51 FakeLogSink::new()
52 }
53}
54
55impl FakeLogSink {
56 pub fn new() -> Self {
58 Self::new_impl(false)
59 }
60
61 fn new_impl(sync: bool) -> Self {
62 let (abort_handle, abort_registration) = AbortHandle::new_pair();
63 let (ehandle, ring_buffer) = if sync {
64 let ehandle_and_ring_buffer = Arc::new((Mutex::new(None), Condvar::new()));
65 let eh_and_rb = ehandle_and_ring_buffer.clone();
66 std::thread::spawn(|| {
67 let _ = fasync::LocalExecutor::default().run_singlethreaded(Abortable::new(
68 async move {
69 let reader = RingBuffer::create(ring_buffer::MAX_MESSAGE_SIZE);
70 *eh_and_rb.0.lock() = Some((EHandle::local(), reader));
71 eh_and_rb.1.notify_all();
72 let () = std::future::pending().await;
73 },
74 abort_registration,
75 ));
76 });
77 let mut ehandle_buffer_guard = ehandle_and_ring_buffer.0.lock();
78 ehandle_and_ring_buffer.1.wait_while(&mut ehandle_buffer_guard, |rb| rb.is_none());
79 ehandle_buffer_guard.take().unwrap()
80 } else {
81 let reader = RingBuffer::create(ring_buffer::MAX_MESSAGE_SIZE);
82 fasync::Task::spawn(async {
83 let _ = Abortable::new(std::future::pending::<()>(), abort_registration).await;
84 })
85 .detach();
86 (EHandle::local(), reader)
87 };
88
89 Self {
90 ring_buffer,
91 min_severity: Arc::new(Mutex::new(MinSeverity {
92 severity: Severity::Info,
93 updates: MinSeverityUpdates::default(),
94 })),
95 ehandle,
96 _abort_handle: DropAbortHandle(abort_handle),
97 }
98 }
99
100 pub async fn read_message(&mut self) -> String {
102 let (_tag, bytes) = self.ring_buffer.read_message().await.unwrap();
103 diagnostics_message::from_structured(
104 MonikerWithUrl { url: "".into(), moniker: "fake-log-sink".try_into().unwrap() },
105 &bytes,
106 )
107 .unwrap()
108 .msg()
109 .unwrap()
110 .into()
111 }
112
113 pub fn serve(&self, server_end: ServerEnd<LogSinkMarker>) {
115 let (iob, _) = self.ring_buffer.new_iob_writer(1).unwrap();
116 let min_severity = self.min_severity.clone();
117 self.ehandle.spawn_detached(async move {
118 let mut requests = server_end.into_stream();
119 {
120 let mut min_severity = min_severity.lock();
121 requests
122 .control_handle()
123 .send_on_init(LogSinkOnInitRequest {
124 buffer: Some(iob),
125 interest: Some(Interest {
126 min_severity: Some(min_severity.severity),
127 ..Default::default()
128 }),
129 ..Default::default()
130 })
131 .unwrap();
132 min_severity.updates = MinSeverityUpdates::None;
133 }
134
135 while let Some(Ok(request)) = requests.next().await {
136 match request {
137 LogSinkRequest::WaitForInterestChange { responder } => {
138 let mut min_severity = min_severity.lock();
139 match &mut min_severity.updates {
140 MinSeverityUpdates::None => {
141 min_severity.updates =
142 MinSeverityUpdates::Listeners(vec![responder]);
143 }
144 MinSeverityUpdates::Pending => {
145 let _ = responder.send(Ok(&Interest {
146 min_severity: Some(min_severity.severity),
147 ..Default::default()
148 }));
149 min_severity.updates = MinSeverityUpdates::None;
150 }
151 MinSeverityUpdates::Listeners(l) => l.push(responder),
152 }
153 }
154 _ => unreachable!(),
155 }
156 }
157 });
158 }
159
160 pub fn set_min_severity(&self, severity: Severity) {
162 let mut min_severity = self.min_severity.lock();
163 if severity == min_severity.severity {
164 return;
165 }
166 min_severity.severity = severity;
167 match &mut min_severity.updates {
168 MinSeverityUpdates::Listeners(l) => {
169 for responder in l.drain(..) {
170 let _ = responder
171 .send(Ok(&Interest { min_severity: Some(severity), ..Default::default() }));
172 }
173 min_severity.updates = MinSeverityUpdates::None;
174 }
175 _ => min_severity.updates = MinSeverityUpdates::Pending,
176 }
177 }
178}
179
180#[cfg(feature = "ffi")]
181pub mod ffi {
182 use log_decoder_c_bindings as _;
186
187 use super::FakeLogSink;
188 use fidl::endpoints::ServerEnd;
189 use fidl_fuchsia_diagnostics_types::Severity;
190 use fuchsia_async::TimeoutExt;
191 use futures::executor::block_on;
192
193 #[unsafe(no_mangle)]
195 pub extern "C" fn fake_log_sink_new() -> *mut FakeLogSink {
196 Box::into_raw(Box::new(super::FakeLogSink::new_impl(true)))
197 }
198
199 #[unsafe(no_mangle)]
205 pub unsafe extern "C" fn fake_log_sink_delete(fake: *mut FakeLogSink) {
206 drop(unsafe { Box::from_raw(fake) });
207 }
208
209 #[unsafe(no_mangle)]
215 pub unsafe extern "C" fn fake_log_sink_serve(fake: *mut FakeLogSink, handle: u32) {
216 unsafe { &*fake }
217 .serve(ServerEnd::new(unsafe { zx::NullableHandle::from_raw(handle) }.into()));
218 }
219
220 #[unsafe(no_mangle)]
227 pub unsafe extern "C" fn fake_log_sink_read_record(
228 fake: *mut FakeLogSink,
229 dest: *mut u8,
230 capacity: usize,
231 ) -> usize {
232 let fake = unsafe { &mut *fake };
233 let buf = block_on(fake.ring_buffer.read_message()).unwrap().1;
234 assert!(buf.len() <= capacity, "{} {capacity}", buf.len());
235 unsafe {
236 std::slice::from_raw_parts_mut(dest, buf.len()).copy_from_slice(&buf);
237 }
238 buf.len()
239 }
240
241 #[unsafe(no_mangle)]
251 pub unsafe extern "C" fn fake_log_sink_set_min_severity(fake: *mut FakeLogSink, severity: u8) {
252 unsafe { &*fake }.set_min_severity(Severity::from_primitive(severity).unwrap());
253 }
254
255 #[unsafe(no_mangle)]
261 pub unsafe extern "C" fn fake_log_sink_wait_for_record(
262 fake: *mut FakeLogSink,
263 deadline_nanos: i64,
264 ) -> usize {
265 unsafe fn erase_lifetime(sink: &mut FakeLogSink) -> &'static mut FakeLogSink {
266 unsafe { std::mem::transmute(sink) }
267 }
268
269 let fake = unsafe { &mut *fake };
270
271 let tail = fake.ring_buffer.tail();
272 let scope = fake.ehandle.global_scope().clone();
273
274 {
275 let fake = unsafe { erase_lifetime(fake) };
276 block_on(scope.compute(async move {
277 fake.ring_buffer
278 .wait(tail)
279 .on_timeout(zx::MonotonicInstant::from_nanos(deadline_nanos), || 0)
280 .await
281 }));
282 }
283
284 let head = fake.ring_buffer.head();
285 if head == tail {
286 0
287 } else {
288 unsafe { fake.ring_buffer.first_message_in(tail..head) }.unwrap().1.len()
289 }
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::FakeLogSink;
296 use diagnostics_log_encoding::encode::{
297 Encoder, EncoderOpts, LogEvent, MutableBuffer, WriteEventParams,
298 };
299 use fidl::endpoints::create_proxy;
300 use fidl_fuchsia_logger::{LogSinkEvent, LogSinkOnInitRequest, MAX_DATAGRAM_LEN_BYTES};
301 use futures::StreamExt;
302 use std::io::Cursor;
303
304 #[fuchsia::test(logging = false)]
305 async fn log() {
306 let (proxy, server) = create_proxy();
307 let mut fake_sink = FakeLogSink::new();
308 fake_sink.serve(server);
309
310 let Some(Ok(LogSinkEvent::OnInit {
313 payload: LogSinkOnInitRequest { buffer: Some(iob), .. },
314 })) = proxy.take_event_stream().next().await
315 else {
316 panic!("Expected OnInit")
317 };
318
319 let mut buf = [0u8; MAX_DATAGRAM_LEN_BYTES as _];
320 let mut encoder = Encoder::new(Cursor::new(&mut buf[..]), EncoderOpts::default());
321 let tags: &[&str] = &[];
322 const MSG: &str = "The quick brown fox jumps over the lazy dog";
323 encoder
324 .write_event(WriteEventParams {
325 event: LogEvent::new(&log::Record::builder().args(format_args!("{MSG}")).build()),
326 tags,
327 metatags: std::iter::empty(),
328 pid: zx::Koid::from_raw(1),
329 tid: zx::Koid::from_raw(2),
330 dropped: 0,
331 })
332 .unwrap();
333 let end = encoder.inner().cursor();
334 iob.write(Default::default(), 0, &encoder.inner().get_ref()[..end]).unwrap();
335
336 assert_eq!(fake_sink.read_message().await, MSG);
337 }
338}