1use crate::logs::repository::LogsRepository;
6use diagnostics_data::{Data, Logs};
7use fidl_fuchsia_diagnostics::{ComponentSelector, StreamMode};
8use fuchsia_async::OnSignals;
9use futures::channel::mpsc::UnboundedReceiver;
10use futures::channel::{mpsc, oneshot};
11use futures::executor::block_on;
12use futures::{FutureExt, StreamExt, select};
13use log::warn;
14use selectors::FastError;
15use std::collections::HashSet;
16use std::io::{self, Write};
17use std::pin::pin;
18use std::sync::Arc;
19use std::{mem, thread};
20use zx::Signals;
21
22pub const MAX_SERIAL_WRITE_SIZE: usize = 256;
23
24pub async fn launch_serial(
28 allow_serial_log_tags: impl IntoIterator<Item = impl AsRef<str>>,
29 deny_serial_log_tags: impl IntoIterator<Item = impl ToString>,
30 logs_repo: Arc<LogsRepository>,
31 sink: impl Write + Send + 'static,
32 mut freeze_receiver: mpsc::UnboundedReceiver<oneshot::Sender<zx::EventPair>>,
33 mut flush_receiver: UnboundedReceiver<oneshot::Sender<()>>,
34) {
35 let mut writer =
36 SerialWriter::new(sink, deny_serial_log_tags.into_iter().map(|s| s.to_string()).collect());
37
38 let mut barrier = writer.get_barrier();
39 let mut log_stream = pin!(logs_repo.logs_cursor(
40 StreamMode::SnapshotThenSubscribe,
41 selectors_from_tags(allow_serial_log_tags),
42 ));
43 loop {
44 select! {
45 log = log_stream.next() => {
46 if let Some(log) = log {
47 writer.log(&log).await;
49 } else {
50 break;
52 }
53 }
54 freeze_request = freeze_receiver.next() => {
55 if let Some(request) = freeze_request {
56 barrier.wait().await;
58 let (client, server) = zx::EventPair::create();
59 let _ = request.send(client);
60 let _ = OnSignals::new(&server, Signals::EVENTPAIR_PEER_CLOSED).await;
61 }
62 }
63 flush_request = flush_receiver.next() => {
64 if let Some(flush_request) = flush_request {
65 logs_repo.flush().await;
69
70 while let Some(Some(log)) = log_stream.next().now_or_never() {
72 writer.log(&log).await;
73 }
74
75 writer.get_barrier().wait().await;
77
78 let _ = flush_request.send(());
81 }
82 }
83 }
84 }
85 writer.get_barrier().wait().await;
87}
88
89fn selectors_from_tags(tags: impl IntoIterator<Item = impl AsRef<str>>) -> Vec<ComponentSelector> {
90 tags.into_iter()
91 .filter_map(|selector| {
92 let selector = selector.as_ref();
93 match selectors::parse_component_selector::<FastError>(selector) {
94 Ok(s) => Some(s),
95 Err(err) => {
96 warn!(selector:%, err:?; "Failed to parse component selector");
97 None
98 }
99 }
100 })
101 .collect()
102}
103
104#[derive(Default)]
106pub struct SerialSink;
107
108impl Write for SerialSink {
109 fn write(&mut self, buffer: &[u8]) -> io::Result<usize> {
110 if cfg!(debug_assertions) {
111 debug_assert!(buffer.len() <= MAX_SERIAL_WRITE_SIZE);
112 } else {
113 use std::sync::atomic::{AtomicBool, Ordering};
114 static ALREADY_LOGGED: AtomicBool = AtomicBool::new(false);
115 if buffer.len() > MAX_SERIAL_WRITE_SIZE && !ALREADY_LOGGED.swap(true, Ordering::Relaxed)
116 {
117 let size = buffer.len();
118 log::error!(
119 size;
120 "Skipping write to serial due to internal error. Exceeded max buffer size."
121 );
122 return Ok(buffer.len());
123 }
124 }
125 unsafe {
127 zx::sys::zx_debug_write(buffer.as_ptr(), buffer.len());
128 }
129 Ok(buffer.len())
130 }
131
132 fn flush(&mut self) -> io::Result<()> {
133 Ok(())
134 }
135}
136
137enum WorkerCommand {
139 Write(Vec<u8>),
141 Flush(oneshot::Sender<()>),
143}
144
145struct SerialWriter {
150 denied_tags: HashSet<String>,
151 sender: mpsc::UnboundedSender<WorkerCommand>,
152 buffers: mpsc::UnboundedReceiver<Vec<u8>>,
153 current_buffer: Vec<u8>,
154}
155
156impl SerialWriter {
157 fn new<S: Write + Send + 'static>(mut sink: S, denied_tags: HashSet<String>) -> Self {
159 let (sender, mut receiver) = mpsc::unbounded();
160 let (buffer_sender, buffers) = mpsc::unbounded();
161
162 for _ in 0..31 {
164 buffer_sender.unbounded_send(Vec::with_capacity(MAX_SERIAL_WRITE_SIZE)).unwrap();
165 }
166
167 thread::spawn(move || {
168 block_on(async {
169 while let Some(command) = receiver.next().await {
170 match command {
171 WorkerCommand::Write(bytes) => {
172 let _ = sink.write(&bytes);
174 let _ = buffer_sender.unbounded_send(bytes);
176 }
177 WorkerCommand::Flush(notifier) => {
178 let _ = notifier.send(());
179 }
180 }
181 }
182 });
183 });
184
185 Self {
186 denied_tags,
187 sender,
188 buffers,
189 current_buffer: Vec::with_capacity(MAX_SERIAL_WRITE_SIZE),
190 }
191 }
192
193 async fn write(&mut self, mut bytes: &[u8]) {
195 while !bytes.is_empty() {
196 if self.current_buffer.capacity() == 0 {
197 self.current_buffer = self.buffers.next().await.unwrap();
198 self.current_buffer.clear();
199 }
200 let (part, rem) = bytes.split_at(std::cmp::min(self.space(), bytes.len()));
201 self.current_buffer.extend(part);
202 if !rem.is_empty() {
203 self.flush();
204 }
205 bytes = rem;
206 }
207 }
208
209 async fn io_writer(&mut self, required: usize) -> IoWriter<'_> {
215 assert!(required < MAX_SERIAL_WRITE_SIZE);
216 if self.current_buffer.capacity() == 0 || self.space() < required {
217 self.flush();
218 self.current_buffer = self.buffers.next().await.unwrap();
219 self.current_buffer.clear();
220 }
221 IoWriter(self)
222 }
223
224 fn flush(&mut self) {
226 if !self.current_buffer.is_empty() {
227 self.current_buffer.push(b'\n');
228 self.sender
229 .unbounded_send(WorkerCommand::Write(mem::take(&mut self.current_buffer)))
230 .unwrap();
231 }
232 }
233
234 fn get_barrier(&self) -> Barrier {
236 Barrier(self.sender.clone())
237 }
238
239 fn space(&self) -> usize {
241 MAX_SERIAL_WRITE_SIZE - 1 - self.current_buffer.len()
243 }
244
245 async fn log(&mut self, log: &Data<Logs>) {
247 if !self.denied_tags.is_empty() {
249 if let Some(tags) = log.tags()
250 && tags.iter().any(|tag| self.denied_tags.contains(tag))
251 {
252 return;
253 }
254
255 let component_name = log.component_name();
258
259 if self.denied_tags.contains(component_name.as_ref()) {
260 return;
261 }
262
263 for name in component_name.split(":") {
266 if self.denied_tags.contains(name) {
267 return;
268 }
269 }
270 }
271
272 write!(
273 self.io_writer(64).await, "[{:05}.{:03}] {:05}:{:05}> [",
275 zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_seconds(),
276 zx::MonotonicDuration::from_nanos(log.metadata.timestamp.into_nanos()).into_millis()
277 % 1000,
278 log.pid().unwrap_or(0),
279 log.tid().unwrap_or(0)
280 )
281 .unwrap();
282
283 if let Some(tags) = log.tags().filter(|tags| !tags.is_empty()) {
284 for (i, tag) in tags.iter().enumerate() {
285 self.write(tag.as_bytes()).await;
286 if i < tags.len() - 1 {
287 self.write(b", ").await;
288 }
289 }
290 } else {
291 self.write(log.component_name().as_bytes()).await;
292 }
293
294 self.write(b"]").await;
298
299 write!(self.io_writer(16).await, " {}: ", log.severity()).unwrap();
300 if let Some(m) = log.msg() {
301 self.write(m.as_bytes()).await;
302 }
303
304 for key_str in log.payload_keys_strings() {
305 self.write(b" ").await;
306 self.write(key_str.as_bytes()).await;
307 }
308
309 self.flush();
314 }
315}
316
317struct IoWriter<'a>(&'a mut SerialWriter);
318
319impl Write for IoWriter<'_> {
320 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
321 self.0.current_buffer.extend(buf);
322 Ok(buf.len())
323 }
324
325 fn flush(&mut self) -> io::Result<()> {
326 Ok(())
327 }
328}
329
330struct Barrier(mpsc::UnboundedSender<WorkerCommand>);
331
332impl Barrier {
333 async fn wait(&mut self) {
335 let (tx, rx) = oneshot::channel();
336
337 self.0.unbounded_send(WorkerCommand::Flush(tx)).unwrap();
338
339 let _ = rx.await;
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use fuchsia_async::{self as fasync};
347 use futures::SinkExt;
348 use futures::channel::mpsc::{self, unbounded};
349
350 use super::*;
351 use crate::identity::ComponentIdentity;
352 use crate::logs::testing::make_message;
353 use diagnostics_data::{BuilderArgs, LogsDataBuilder, LogsField, LogsProperty, Severity};
354 use fidl::endpoints::create_proxy;
355 use fidl_fuchsia_logger::LogSinkMarker;
356 use fuchsia_async::TimeoutExt;
357 use fuchsia_sync::Mutex;
358 use futures::FutureExt;
359 use moniker::ExtendedMoniker;
360 use std::time::Duration;
361 use zx::BootInstant;
362
363 struct TestSink {
365 buffer: Vec<u8>,
366 snd: mpsc::UnboundedSender<String>,
367 }
368
369 impl TestSink {
370 fn new() -> (Self, mpsc::UnboundedReceiver<String>) {
371 let (snd, rcv) = mpsc::unbounded();
372 (Self { buffer: Vec::new(), snd }, rcv)
373 }
374 }
375
376 impl Write for TestSink {
377 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
378 for chunk in buf.split_inclusive(|c| *c == b'\n') {
379 if !self.buffer.is_empty() {
380 self.buffer.extend(chunk);
381 if *self.buffer.last().unwrap() == b'\n' {
382 self.snd
383 .unbounded_send(
384 String::from_utf8(std::mem::take(&mut self.buffer))
385 .expect("wrote valid utf8"),
386 )
387 .expect("sent item");
388 }
389 } else if *chunk.last().unwrap() == b'\n' {
390 self.snd
391 .unbounded_send(str::from_utf8(chunk).expect("wrote valid utf8").into())
392 .unwrap();
393 } else {
394 self.buffer.extend(chunk);
395 }
396 }
397 Ok(buf.len())
398 }
399
400 fn flush(&mut self) -> io::Result<()> {
401 Ok(())
402 }
403 }
404
405 #[derive(Clone, Default)]
407 struct FakeSink(Arc<Mutex<Vec<u8>>>);
408
409 impl FakeSink {
410 fn with_buffer<R>(&self, f: impl FnOnce(&Vec<u8>) -> R) -> R {
411 f(&self.0.lock())
412 }
413 }
414
415 impl Write for FakeSink {
416 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
417 self.0.lock().write(buf)
418 }
419
420 fn flush(&mut self) -> io::Result<()> {
421 unreachable!();
422 }
423 }
424
425 #[fuchsia::test]
426 async fn write_to_serial_handles_denied_tags() {
427 let log = LogsDataBuilder::new(BuilderArgs {
428 timestamp: BootInstant::from_nanos(1),
429 component_url: Some("url".into()),
430 moniker: "core/foo".try_into().unwrap(),
431 severity: Severity::Info,
432 })
433 .add_tag("denied-tag")
434 .build();
435 let sink = FakeSink::default();
436 let mut writer =
437 SerialWriter::new(sink.clone(), HashSet::from_iter(["denied-tag".to_string()]));
438 writer.log(&log).await;
439 writer.get_barrier().wait().await;
440 assert!(sink.with_buffer(|b| b.is_empty()));
441 }
442
443 #[fuchsia::test]
444 async fn write_to_serial_splits_lines() {
445 let message = concat!(
446 "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aliquam accumsan eu neque ",
447 "quis molestie. Nam rhoncus sapien non eleifend tristique. Duis quis turpis volutpat ",
448 "neque bibendum molestie. Etiam ac sapien justo. Nullam aliquet ipsum nec tincidunt."
449 );
450 let log = LogsDataBuilder::new(BuilderArgs {
451 timestamp: BootInstant::from_nanos(123456789),
452 component_url: Some("url".into()),
453 moniker: "core/foo".try_into().unwrap(),
454 severity: Severity::Info,
455 })
456 .add_tag("bar")
457 .set_message(message)
458 .add_key(LogsProperty::String(LogsField::Other("key".to_string()), "value".to_string()))
459 .add_key(LogsProperty::Int(LogsField::Other("other_key".to_string()), 3))
460 .set_pid(1234)
461 .set_tid(5678)
462 .build();
463 let sink = FakeSink::default();
464 let mut writer = SerialWriter::new(sink.clone(), HashSet::new());
465 writer.log(&log).await;
466 writer.get_barrier().wait().await;
467 sink.with_buffer(|b| {
468 assert_eq!(
469 str::from_utf8(b).unwrap(),
470 format!(
471 "[00000.123] 01234:05678> [bar] INFO: {}\n{} key=value other_key=3\n",
472 &message[..218],
473 &message[218..]
474 )
475 )
476 });
477 }
478
479 #[fuchsia::test]
480 async fn when_no_tags_are_present_the_component_name_is_used() {
481 let log = LogsDataBuilder::new(BuilderArgs {
482 timestamp: BootInstant::from_nanos(123456789),
483 component_url: Some("url".into()),
484 moniker: "core/foo".try_into().unwrap(),
485 severity: Severity::Info,
486 })
487 .set_message("my msg")
488 .set_pid(1234)
489 .set_tid(5678)
490 .build();
491 let sink = FakeSink::default();
492 let mut writer = SerialWriter::new(sink.clone(), HashSet::new());
493 writer.log(&log).await;
494 writer.flush();
495 writer.get_barrier().wait().await;
496 sink.with_buffer(|b| {
497 assert_eq!(str::from_utf8(b).unwrap(), "[00000.123] 01234:05678> [foo] INFO: my msg\n");
498 });
499 }
500
501 #[fuchsia::test]
502 async fn pauses_logs_correctly() {
503 let repo = LogsRepository::for_test(fasync::Scope::new());
504
505 let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
506 ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
507 "fuchsia-pkg://bootstrap-foo",
508 )));
509 let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
510 ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
511 "fuchsia-pkg://bootstrap-bar",
512 )));
513 let bootstrap_denied_component_container =
514 repo.get_log_container(Arc::new(ComponentIdentity::new(
515 ExtendedMoniker::parse_str("./bootstrap/denied_component").unwrap(),
516 "fuchsia-pkg://bootstrap-denied_component",
517 )));
518
519 let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
520 ExtendedMoniker::parse_str("./core/foo").unwrap(),
521 "fuchsia-pkg://core-foo",
522 )));
523 let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
524 ExtendedMoniker::parse_str("./core/baz").unwrap(),
525 "fuchsia-pkg://core-baz",
526 )));
527
528 bootstrap_foo_container.ingest_message(make_message(
529 "a",
530 None,
531 zx::BootInstant::from_nanos(1),
532 ));
533
534 let (_flush_sender, flush_receiver) = unbounded();
535
536 core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
537 let (sink, mut rcv) = TestSink::new();
538 let cloned_repo = Arc::clone(&repo);
539 let (mut sender, receiver) = unbounded();
540 let _serial_task = fasync::Task::spawn(async move {
541 let allowed = &["bootstrap/**", "/core/foo"];
542 let denied = &["denied_tag", "denied_component"];
543 launch_serial(allowed, denied, cloned_repo, sink, receiver, flush_receiver).await;
544 });
545
546 bootstrap_bar_container.ingest_message(make_message(
547 "b",
548 Some("denied_tag"),
549 zx::BootInstant::from_nanos(3),
550 ));
551 bootstrap_denied_component_container.ingest_message(make_message(
552 "d",
553 None,
554 zx::BootInstant::from_nanos(3),
555 ));
556
557 let received = rcv.next().await.unwrap();
558 assert_eq!(received, "[00000.000] 00001:00002> [foo] DEBUG: a\n");
559
560 let (tx, rx) = oneshot::channel();
561 sender.send(tx).await.unwrap();
562
563 let freeze_token = rx.await.unwrap();
564
565 core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
566
567 assert!(rcv.next().map(|_| false).on_timeout(Duration::from_millis(500), || true).await);
570
571 drop(freeze_token);
572
573 assert_eq!(rcv.next().await.unwrap(), "[00000.000] 00001:00002> [foo] DEBUG: c\n");
574 }
575
576 #[fuchsia::test]
577 async fn writes_ingested_logs() {
578 let repo = LogsRepository::for_test(fasync::Scope::new());
579
580 let bootstrap_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
581 ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
582 "fuchsia-pkg://bootstrap-foo",
583 )));
584 let bootstrap_bar_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
585 ExtendedMoniker::parse_str("./bootstrap/bar").unwrap(),
586 "fuchsia-pkg://bootstrap-bar",
587 )));
588 let denied_collection = repo.get_log_container(Arc::new(ComponentIdentity::new(
589 ExtendedMoniker::parse_str("./bootstrap/denied-collection:foo-bar").unwrap(),
590 "fuchsia-pkg://bootstrap-denied-collection:foo-bar",
591 )));
592 let bootstrap_denied_component_container =
593 repo.get_log_container(Arc::new(ComponentIdentity::new(
594 ExtendedMoniker::parse_str("./bootstrap/denied_component").unwrap(),
595 "fuchsia-pkg://bootstrap-denied_component",
596 )));
597 let denied_collection_instance = repo.get_log_container(Arc::new(ComponentIdentity::new(
598 ExtendedMoniker::parse_str("./bootstrap/collection:denied-foo-bar").unwrap(),
599 "fuchsia-pkg://bootstrap-collection:denied-foo-bar",
600 )));
601
602 let denied_collection_and_instance =
603 repo.get_log_container(Arc::new(ComponentIdentity::new(
604 ExtendedMoniker::parse_str("./bootstrap/collection:denied-foo-bar").unwrap(),
605 "fuchsia-pkg://bootstrap-collection:denied-foo-bar",
606 )));
607
608 let collection_and_instance = repo.get_log_container(Arc::new(ComponentIdentity::new(
609 ExtendedMoniker::parse_str("./bootstrap/collection:foo-bar").unwrap(),
610 "fuchsia-pkg://bootstrap-collection:foo-bar",
611 )));
612
613 let core_foo_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
614 ExtendedMoniker::parse_str("./core/foo").unwrap(),
615 "fuchsia-pkg://core-foo",
616 )));
617 let core_baz_container = repo.get_log_container(Arc::new(ComponentIdentity::new(
618 ExtendedMoniker::parse_str("./core/baz").unwrap(),
619 "fuchsia-pkg://core-baz",
620 )));
621
622 bootstrap_foo_container.ingest_message(make_message(
623 "a",
624 None,
625 zx::BootInstant::from_nanos(1),
626 ));
627 core_baz_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(2)));
628 denied_collection.ingest_message(make_message("d", None, zx::BootInstant::from_nanos(2)));
629 denied_collection_instance.ingest_message(make_message(
630 "e",
631 None,
632 zx::BootInstant::from_nanos(2),
633 ));
634 denied_collection_and_instance.ingest_message(make_message(
635 "f",
636 None,
637 zx::BootInstant::from_nanos(2),
638 ));
639 collection_and_instance.ingest_message(make_message(
640 "g",
641 None,
642 zx::BootInstant::from_nanos(2),
643 ));
644
645 let (sink, rcv) = TestSink::new();
646
647 let (_freeze_sender, freeze_receiver) = unbounded();
648 let (_flush_sender, flush_receiver) = unbounded();
649 let _serial_task = fasync::Task::spawn(launch_serial(
650 &["bootstrap/**", "/core/foo"],
651 &[
652 "denied_tag",
653 "denied_component",
654 "denied-collection",
655 "denied-foo-bar",
656 "collection:denied-foo-bar",
657 ],
658 Arc::clone(&repo),
659 sink,
660 freeze_receiver,
661 flush_receiver,
662 ));
663
664 bootstrap_bar_container.ingest_message(make_message(
665 "b",
666 Some("denied_tag"),
667 zx::BootInstant::from_nanos(3),
668 ));
669 bootstrap_denied_component_container.ingest_message(make_message(
670 "d",
671 None,
672 zx::BootInstant::from_nanos(3),
673 ));
674 core_foo_container.ingest_message(make_message("c", None, zx::BootInstant::from_nanos(4)));
675 let received: Vec<_> = rcv.take(3).collect().await;
676
677 assert_eq!(
681 received,
682 vec![
683 "[00000.000] 00001:00002> [foo] DEBUG: a\n",
684 "[00000.000] 00001:00002> [collection:foo-bar] DEBUG: g\n",
685 "[00000.000] 00001:00002> [foo] DEBUG: c\n"
686 ]
687 );
688 }
689
690 #[fuchsia::test]
691 async fn flush_drains_all_logs() {
692 for _ in 0..500 {
693 let scope = fasync::Scope::new();
694 let repo = LogsRepository::for_test(scope.new_child());
695 let identity = Arc::new(ComponentIdentity::new(
696 ExtendedMoniker::parse_str("./bootstrap/foo").unwrap(),
697 "fuchsia-pkg://bootstrap-foo",
698 ));
699
700 let (log_sink, server_end) = create_proxy::<LogSinkMarker>();
701 let log_container = repo.get_log_container(identity);
702 log_container.handle_log_sink(server_end.into_stream(), scope.as_handle().clone());
703 let (client_socket, server_socket) = zx::Socket::create_datagram();
704 log_sink.connect_structured(server_socket).unwrap();
705
706 let (sink, mut rcv) = TestSink::new();
707 let cloned_repo = Arc::clone(&repo);
708 let (_freeze_sender, freeze_receiver) = unbounded();
709 let (flush_sender, flush_receiver) = unbounded();
710 scope.spawn(async move {
711 let allowed = &["bootstrap/**"];
712 let denied: &[&str] = &[];
713 launch_serial(allowed, denied, cloned_repo, sink, freeze_receiver, flush_receiver)
714 .await;
715 });
716
717 let mut buffer = [0u8; 1024];
718 let cursor = std::io::Cursor::new(&mut buffer[..]);
719 let mut encoder = diagnostics_log_encoding::encode::Encoder::new(
720 cursor,
721 diagnostics_log_encoding::encode::EncoderOpts::default(),
722 );
723 log_sink.wait_for_interest_change().await.unwrap().unwrap();
725
726 encoder
727 .write_record(diagnostics_log_encoding::Record {
728 timestamp: zx::BootInstant::from_nanos(1),
729 severity: Severity::Info as u8,
730 arguments: vec![
731 diagnostics_log_encoding::Argument::new("tag", "foo"),
732 diagnostics_log_encoding::Argument::new("message", "a"),
733 ],
734 })
735 .unwrap();
736 client_socket
737 .write(&encoder.inner().get_ref()[..encoder.inner().position() as usize])
738 .unwrap();
739 let (sender, receiver) = oneshot::channel();
740 flush_sender.unbounded_send(sender).unwrap();
742 assert_eq!(receiver.await, Ok(()));
745 let received = rcv.next().now_or_never().unwrap().unwrap();
746 assert_eq!(received, "[00000.000] 00000:00000> [foo] INFO: a\n");
747 }
748 }
749}