summaryrefslogtreecommitdiffstats
path: root/src/log
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-10-22 17:26:00 +0200
committerMatthias Beyer <mail@beyermatthias.de>2020-10-27 14:06:50 +0100
commita9fb2644cac1a4fe8cd75df3ed8b5f9182cb8238 (patch)
tree3569f01de3ab56f7418148e4344f8372dc05ba19 /src/log
parent4ecf1e1e4fa2f8b9cb75260a0b033025e234984b (diff)
Implement log module for job log handling
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/log')
-rw-r--r--src/log/mod.rs5
-rw-r--r--src/log/parser.rs136
-rw-r--r--src/log/util.rs28
3 files changed, 169 insertions, 0 deletions
diff --git a/src/log/mod.rs b/src/log/mod.rs
new file mode 100644
index 0000000..d73ab0a
--- /dev/null
+++ b/src/log/mod.rs
@@ -0,0 +1,5 @@
+mod parser;
+pub use parser::*;
+
+mod util;
+
diff --git a/src/log/parser.rs b/src/log/parser.rs
new file mode 100644
index 0000000..01ecea5
--- /dev/null
+++ b/src/log/parser.rs
@@ -0,0 +1,136 @@
+use std::result::Result as RResult;
+use std::str::FromStr;
+use std::char::{decode_utf16, REPLACEMENT_CHARACTER};
+
+use anyhow::Error;
+use anyhow::Result;
+use anyhow::anyhow;
+use futures::AsyncBufReadExt;
+use futures::Stream;
+use futures::StreamExt;
+use futures::TryStreamExt;
+use pom::*;
+use pom::parser::Parser as PomParser;
+use shiplift::tty::TtyChunk;
+
+use crate::log::util::*;
+
+type IoResult<T> = RResult<T, futures::io::Error>;
+
+fn buffer_stream_to_line_stream<S>(stream: S) -> impl Stream<Item = IoResult<String>>
+ where S: Stream<Item = shiplift::Result<TtyChunk>> + std::marker::Unpin
+{
+ stream.map(|r| r.map(TtyChunkBuf::from))
+ .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e))
+ .into_async_read()
+ .lines()
+}
+
+#[derive(Debug, PartialEq, Eq, Hash)]
+pub enum LogItem {
+ /// A line from the log, unmodified
+ Line(Vec<u8>),
+
+ /// A progress report
+ Progress(usize),
+
+ /// The name of the current phase the process is in
+ CurrentPhase(String),
+
+ /// The end-state of the process
+ /// Either Ok or Error
+ State(Result<String, String>),
+}
+
+pub fn parser<'a>() -> PomParser<'a, u8, LogItem> {
+ use pom::parser::*;
+ use pom::char_class::hex_digit;
+
+ let number = one_of(b"0123456789")
+ .repeat(1..)
+ .collect()
+ .convert(|b| String::from_utf8(b.to_vec()))
+ .convert(|s| usize::from_str(&s));
+ let space = one_of(b" \t\r\n")
+ .repeat(0..)
+ .discard();
+
+ fn ignored<'a>() -> PomParser<'a, u8, Vec<u8>> {
+ none_of(b"\n").repeat(0..)
+ }
+
+ fn string<'a>() -> PomParser<'a, u8, String> {
+ let special_char = sym(b'\\') | sym(b'/') | sym(b'"')
+ | sym(b'b').map(|_|b'\x08') | sym(b'f').map(|_|b'\x0C')
+ | sym(b'n').map(|_|b'\n') | sym(b'r').map(|_|b'\r') | sym(b't').map(|_|b'\t');
+ let escape_sequence = sym(b'\\') * special_char;
+ let string = sym(b'"') * (none_of(b"\\\"") | escape_sequence).repeat(0..) - sym(b'"');
+
+ string.convert(String::from_utf8)
+ }
+
+ (
+ seq(b"#BUTIDO:") * (
+ (seq(b"PROGRESS:") * number.map(|n| LogItem::Progress(n)))
+ |
+ (seq(b"PHASE:") * string().map(|s| LogItem::CurrentPhase(s)))
+ |
+ (
+ (seq(b"STATE:ERR") * string().map(|s| LogItem::State(Err(s))))
+ |
+ (seq(b"STATE:OK") * string().map(|s| LogItem::State(Ok(s))))
+ )
+ )
+ )
+ | ignored().map(|s| LogItem::Line(Vec::from(s)))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_non_log() {
+ let s = "foo bar";
+ let p = parser();
+ let r = p.parse(s.as_bytes());
+
+ assert!(r.is_ok(), "Not ok: {:?}", r);
+ let r = r.unwrap();
+ assert_eq!(r, LogItem::Line("foo bar".bytes().collect()));
+ }
+
+ #[test]
+ fn test_progress_1() {
+ let s = "#BUTIDO:PROGRESS:1";
+ let p = parser();
+ let r = p.parse(s.as_bytes());
+
+ assert!(r.is_ok(), "Not ok: {:?}", r);
+ let r = r.unwrap();
+ assert_eq!(r, LogItem::Progress(1));
+ }
+
+ #[test]
+ fn test_progress_100() {
+ let s = "#BUTIDO:PROGRESS:100";
+ let p = parser();
+ let r = p.parse(s.as_bytes());
+
+ assert!(r.is_ok(), "Not ok: {:?}", r);
+ let r = r.unwrap();
+ assert_eq!(r, LogItem::Progress(100));
+ }
+
+ #[test]
+ fn test_progress_negative() {
+ let s = "#BUTIDO:PROGRESS:-1";
+ let p = parser();
+ let r = p.parse(s.as_bytes());
+
+ assert!(r.is_ok(), "Not ok: {:?}", r);
+ let r = r.unwrap();
+ assert_eq!(r, LogItem::Line("#BUTIDO:PROGRESS:-1".bytes().collect()));
+ }
+}
+
diff --git a/src/log/util.rs b/src/log/util.rs
new file mode 100644
index 0000000..675a4c4
--- /dev/null
+++ b/src/log/util.rs
@@ -0,0 +1,28 @@
+use shiplift::tty::TtyChunk;
+
+pub enum TtyChunkBuf {
+ StdIn (Vec<u8>),
+ StdOut(Vec<u8>),
+ StdErr(Vec<u8>),
+}
+
+impl From<TtyChunk> for TtyChunkBuf {
+ fn from(c: TtyChunk) -> Self {
+ match c {
+ TtyChunk::StdIn(buffer) => TtyChunkBuf::StdIn (buffer),
+ TtyChunk::StdOut(buffer) => TtyChunkBuf::StdOut(buffer),
+ TtyChunk::StdErr(buffer) => TtyChunkBuf::StdErr(buffer),
+ }
+ }
+}
+
+impl AsRef<[u8]> for TtyChunkBuf {
+ fn as_ref(&self) -> &[u8] {
+ match self {
+ TtyChunkBuf::StdIn(buffer) => buffer.as_ref(),
+ TtyChunkBuf::StdOut(buffer) => buffer.as_ref(),
+ TtyChunkBuf::StdErr(buffer) => buffer.as_ref(),
+ }
+ }
+}
+