diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-10-22 17:26:00 +0200 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-10-27 14:06:50 +0100 |
commit | a9fb2644cac1a4fe8cd75df3ed8b5f9182cb8238 (patch) | |
tree | 3569f01de3ab56f7418148e4344f8372dc05ba19 /src/log | |
parent | 4ecf1e1e4fa2f8b9cb75260a0b033025e234984b (diff) |
Implement log module for job log handling
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/log')
-rw-r--r-- | src/log/mod.rs | 5 | ||||
-rw-r--r-- | src/log/parser.rs | 136 | ||||
-rw-r--r-- | src/log/util.rs | 28 |
3 files changed, 169 insertions, 0 deletions
diff --git a/src/log/mod.rs b/src/log/mod.rs new file mode 100644 index 0000000..d73ab0a --- /dev/null +++ b/src/log/mod.rs @@ -0,0 +1,5 @@ +mod parser; +pub use parser::*; + +mod util; + diff --git a/src/log/parser.rs b/src/log/parser.rs new file mode 100644 index 0000000..01ecea5 --- /dev/null +++ b/src/log/parser.rs @@ -0,0 +1,136 @@ +use std::result::Result as RResult; +use std::str::FromStr; +use std::char::{decode_utf16, REPLACEMENT_CHARACTER}; + +use anyhow::Error; +use anyhow::Result; +use anyhow::anyhow; +use futures::AsyncBufReadExt; +use futures::Stream; +use futures::StreamExt; +use futures::TryStreamExt; +use pom::*; +use pom::parser::Parser as PomParser; +use shiplift::tty::TtyChunk; + +use crate::log::util::*; + +type IoResult<T> = RResult<T, futures::io::Error>; + +fn buffer_stream_to_line_stream<S>(stream: S) -> impl Stream<Item = IoResult<String>> + where S: Stream<Item = shiplift::Result<TtyChunk>> + std::marker::Unpin +{ + stream.map(|r| r.map(TtyChunkBuf::from)) + .map_err(|e| futures::io::Error::new(futures::io::ErrorKind::Other, e)) + .into_async_read() + .lines() +} + +#[derive(Debug, PartialEq, Eq, Hash)] +pub enum LogItem { + /// A line from the log, unmodified + Line(Vec<u8>), + + /// A progress report + Progress(usize), + + /// The name of the current phase the process is in + CurrentPhase(String), + + /// The end-state of the process + /// Either Ok or Error + State(Result<String, String>), +} + +pub fn parser<'a>() -> PomParser<'a, u8, LogItem> { + use pom::parser::*; + use pom::char_class::hex_digit; + + let number = one_of(b"0123456789") + .repeat(1..) + .collect() + .convert(|b| String::from_utf8(b.to_vec())) + .convert(|s| usize::from_str(&s)); + let space = one_of(b" \t\r\n") + .repeat(0..) + .discard(); + + fn ignored<'a>() -> PomParser<'a, u8, Vec<u8>> { + none_of(b"\n").repeat(0..) + } + + fn string<'a>() -> PomParser<'a, u8, String> { + let special_char = sym(b'\\') | sym(b'/') | sym(b'"') + | sym(b'b').map(|_|b'\x08') | sym(b'f').map(|_|b'\x0C') + | sym(b'n').map(|_|b'\n') | sym(b'r').map(|_|b'\r') | sym(b't').map(|_|b'\t'); + let escape_sequence = sym(b'\\') * special_char; + let string = sym(b'"') * (none_of(b"\\\"") | escape_sequence).repeat(0..) - sym(b'"'); + + string.convert(String::from_utf8) + } + + ( + seq(b"#BUTIDO:") * ( + (seq(b"PROGRESS:") * number.map(|n| LogItem::Progress(n))) + | + (seq(b"PHASE:") * string().map(|s| LogItem::CurrentPhase(s))) + | + ( + (seq(b"STATE:ERR") * string().map(|s| LogItem::State(Err(s)))) + | + (seq(b"STATE:OK") * string().map(|s| LogItem::State(Ok(s)))) + ) + ) + ) + | ignored().map(|s| LogItem::Line(Vec::from(s))) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_non_log() { + let s = "foo bar"; + let p = parser(); + let r = p.parse(s.as_bytes()); + + assert!(r.is_ok(), "Not ok: {:?}", r); + let r = r.unwrap(); + assert_eq!(r, LogItem::Line("foo bar".bytes().collect())); + } + + #[test] + fn test_progress_1() { + let s = "#BUTIDO:PROGRESS:1"; + let p = parser(); + let r = p.parse(s.as_bytes()); + + assert!(r.is_ok(), "Not ok: {:?}", r); + let r = r.unwrap(); + assert_eq!(r, LogItem::Progress(1)); + } + + #[test] + fn test_progress_100() { + let s = "#BUTIDO:PROGRESS:100"; + let p = parser(); + let r = p.parse(s.as_bytes()); + + assert!(r.is_ok(), "Not ok: {:?}", r); + let r = r.unwrap(); + assert_eq!(r, LogItem::Progress(100)); + } + + #[test] + fn test_progress_negative() { + let s = "#BUTIDO:PROGRESS:-1"; + let p = parser(); + let r = p.parse(s.as_bytes()); + + assert!(r.is_ok(), "Not ok: {:?}", r); + let r = r.unwrap(); + assert_eq!(r, LogItem::Line("#BUTIDO:PROGRESS:-1".bytes().collect())); + } +} + diff --git a/src/log/util.rs b/src/log/util.rs new file mode 100644 index 0000000..675a4c4 --- /dev/null +++ b/src/log/util.rs @@ -0,0 +1,28 @@ +use shiplift::tty::TtyChunk; + +pub enum TtyChunkBuf { + StdIn (Vec<u8>), + StdOut(Vec<u8>), + StdErr(Vec<u8>), +} + +impl From<TtyChunk> for TtyChunkBuf { + fn from(c: TtyChunk) -> Self { + match c { + TtyChunk::StdIn(buffer) => TtyChunkBuf::StdIn (buffer), + TtyChunk::StdOut(buffer) => TtyChunkBuf::StdOut(buffer), + TtyChunk::StdErr(buffer) => TtyChunkBuf::StdErr(buffer), + } + } +} + +impl AsRef<[u8]> for TtyChunkBuf { + fn as_ref(&self) -> &[u8] { + match self { + TtyChunkBuf::StdIn(buffer) => buffer.as_ref(), + TtyChunkBuf::StdOut(buffer) => buffer.as_ref(), + TtyChunkBuf::StdErr(buffer) => buffer.as_ref(), + } + } +} + |