termion/
async.rs

1use std::io::{self, Read};
2use std::sync::mpsc;
3use std::thread;
4
5use sys::tty::get_tty;
6
7/// Construct an asynchronous handle to the TTY standard input, with a delimiter byte.
8///
9/// This has the same advantages as async_stdin(), but also allows specifying a delimiter byte. The
10/// reader will stop reading after consuming the delimiter byte.
11pub fn async_stdin_until(delimiter: u8) -> AsyncReader {
12    let (send, recv) = mpsc::channel();
13
14    thread::spawn(move || for i in get_tty().unwrap().bytes() {
15
16        match i {
17            Ok(byte) => {
18                let end_of_stream = &byte == &delimiter;
19                let send_error = send.send(Ok(byte)).is_err();
20
21                if end_of_stream || send_error { return; }
22            },
23            Err(_) => { return; }
24        }
25    });
26
27    AsyncReader { recv: recv }
28}
29
30/// Construct an asynchronous handle to the TTY standard input.
31///
32/// This allows you to read from standard input _without blocking_ the current thread.
33/// Specifically, it works by firing up another thread to handle the event stream, which will then
34/// be buffered in a mpsc queue, which will eventually be read by the current thread.
35///
36/// This will not read the piped standard input, but rather read from the TTY device, since reading
37/// asyncronized from piped input would rarely make sense. In other words, if you pipe standard
38/// output from another process, it won't be reflected in the stream returned by this function, as
39/// this represents the TTY device, and not the piped standard input.
40pub fn async_stdin() -> AsyncReader {
41    let (send, recv) = mpsc::channel();
42
43    thread::spawn(move || for i in get_tty().unwrap().bytes() {
44                      if send.send(i).is_err() {
45                          return;
46                      }
47                  });
48
49    AsyncReader { recv: recv }
50}
51
52/// An asynchronous reader.
53///
54/// This acts as any other stream, with the exception that reading from it won't block. Instead,
55/// the buffer will only be partially updated based on how much the internal buffer holds.
56pub struct AsyncReader {
57    /// The underlying mpsc receiver.
58    recv: mpsc::Receiver<io::Result<u8>>,
59}
60
61// FIXME: Allow constructing an async reader from an arbitrary stream.
62
63impl Read for AsyncReader {
64    /// Read from the byte stream.
65    ///
66    /// This will never block, but try to drain the event queue until empty. If the total number of
67    /// bytes written is lower than the buffer's length, the event queue is empty or that the event
68    /// stream halted.
69    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
70        let mut total = 0;
71
72        loop {
73            if total >= buf.len() {
74                break;
75            }
76
77            match self.recv.try_recv() {
78                Ok(Ok(b)) => {
79                    buf[total] = b;
80                    total += 1;
81                }
82                Ok(Err(e)) => return Err(e),
83                Err(_) => break,
84            }
85        }
86
87        Ok(total)
88    }
89}
90
91#[cfg(test)]
92mod test {
93    use super::*;
94    use std::io::Read;
95
96    #[test]
97    fn test_async_stdin() {
98        let stdin = async_stdin();
99        stdin.bytes().next();
100    }
101}