summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-04-17 19:20:53 +0200
committerMatthias Beyer <mail@beyermatthias.de>2021-04-17 19:20:53 +0200
commit8dfd21ed430e9407880a5fc640b3351ccf34db5c (patch)
tree993ffecf69680768496ed405af1f4e3665d5eba9
parent056a0728c775e37460ed00791ad503e03a88f3d6 (diff)
Refactor into server-client architecture using protobuf
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--Cargo.toml16
-rw-r--r--build.rs4
-rw-r--r--proto/fss.proto36
-rw-r--r--shell.nix29
-rw-r--r--src/cli.rs37
-rw-r--r--src/client.rs85
-rw-r--r--src/config.rs8
-rw-r--r--src/main.rs107
-rw-r--r--src/server.rs179
9 files changed, 389 insertions, 112 deletions
diff --git a/Cargo.toml b/Cargo.toml
index e55ec40..c4369ed 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -5,7 +5,13 @@ authors = ["Matthias Beyer <mail@beyermatthias.de>"]
edition = "2018"
license = "GPL-2.0"
-# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
+[[bin]]
+name = "fss-server"
+path = "src/server.rs"
+
+[[bin]]
+name = "fss"
+path = "src/client.rs"
[dependencies]
anyhow = "1.0.40"
@@ -15,7 +21,15 @@ env_logger = "0.8.3"
getset = "0.1.1"
itertools = "0.10.0"
log = "0.4.14"
+prost = "0.7"
serde = "1.0.125"
tantivy = "0.14"
+tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
+tonic = "0.4"
xdg = "2.2.0"
+version = "3.0.0"
+resiter = "0.4"
+
+[build-dependencies]
+tonic-build = "0.4"
diff --git a/build.rs b/build.rs
new file mode 100644
index 0000000..36587c2
--- /dev/null
+++ b/build.rs
@@ -0,0 +1,4 @@
+fn main() -> Result<(), Box<dyn std::error::Error>> {
+ tonic_build::compile_protos("proto/fss.proto")?;
+ Ok(())
+}
diff --git a/proto/fss.proto b/proto/fss.proto
new file mode 100644
index 0000000..12fda7a
--- /dev/null
+++ b/proto/fss.proto
@@ -0,0 +1,36 @@
+syntax = "proto3";
+package fss;
+
+service FSS {
+ rpc GetVersion (VersionRequest) returns (VersionReply);
+
+ rpc IndexFile (IndexFileRequest) returns (IndexFileReply);
+ rpc SearchQuery (SearchQueryRequest) returns (SearchResponse);
+}
+
+
+message VersionRequest {
+}
+
+message VersionReply {
+ string version = 1; // Reply contains only the version string
+}
+
+
+message IndexFileRequest {
+ string path = 1;
+}
+
+message IndexFileReply {
+ bool error = 1;
+}
+
+
+message SearchQueryRequest {
+ string query = 1;
+}
+
+message SearchResponse {
+ bool success = 1;
+ repeated string pathes = 2;
+}
diff --git a/shell.nix b/shell.nix
new file mode 100644
index 0000000..3379a4c
--- /dev/null
+++ b/shell.nix
@@ -0,0 +1,29 @@
+{ ... }:
+
+let
+ moz_overlay = import (
+ builtins.fetchTarball https://github.com/mozilla/nixpkgs-mozilla/archive/master.tar.gz
+ );
+
+ pkgs = import <nixpkgs> { overlays = [ moz_overlay ]; };
+in
+pkgs.mkShell {
+ buildInputs = with pkgs; [
+ rustChannels.stable.rust-std
+ rustChannels.stable.rust
+ rustChannels.stable.rustc
+ rustChannels.stable.cargo
+
+ cmake
+ curl
+ gcc
+ openssl
+ pkgconfig
+
+ protobuf
+ ];
+
+ LIBCLANG_PATH = "${pkgs.llvmPackages.libclang}/lib";
+ PROTOC = "${pkgs.protobuf}/bin/protoc";
+}
+
diff --git a/src/cli.rs b/src/cli.rs
index 9098cfe..61399f7 100644
--- a/src/cli.rs
+++ b/src/cli.rs
@@ -3,21 +3,34 @@ use clap::Arg;
use clap::crate_authors;
use clap::crate_version;
-pub fn app<'a>() -> App<'a, 'a> {
+pub fn server_app<'a>() -> App<'a, 'a> {
+ App::new("fss")
+ .author(crate_authors!())
+ .version(crate_version!())
+ .about("Filesystemsearch")
+
+ .arg(self::common::arg_server_addr())
+ .arg(self::common::arg_server_port())
+}
+
+pub fn client_app<'a>() -> App<'a, 'a> {
App::new("fss")
.author(crate_authors!())
.version(crate_version!())
.about("Filesystemsearch")
+ .arg(self::common::arg_server_addr())
+ .arg(self::common::arg_server_port())
+
.subcommand(App::new("index")
.version(crate_version!())
.about("Index a file")
.arg(Arg::with_name("file")
.required(true)
- .multiple(true)
+ .multiple(false)
.value_name("FILE")
- .help("Index these files")
+ .help("Index this file")
)
)
@@ -33,4 +46,22 @@ pub fn app<'a>() -> App<'a, 'a> {
)
}
+mod common {
+ use clap::Arg;
+ pub fn arg_server_addr<'a, 'b>() -> Arg<'a, 'b> {
+ Arg::with_name("server_addr")
+ .required(false)
+ .multiple(false)
+ .value_name("ADDR")
+ .help("The address of the server")
+ }
+
+ pub fn arg_server_port<'a, 'b>() -> Arg<'a, 'b> {
+ Arg::with_name("server_port")
+ .required(false)
+ .multiple(false)
+ .value_name("PORT")
+ .help("The port of the server")
+ }
+}
diff --git a/src/client.rs b/src/client.rs
new file mode 100644
index 0000000..7249098
--- /dev/null
+++ b/src/client.rs
@@ -0,0 +1,85 @@
+use std::io::Write;
+use std::str::FromStr;
+
+use anyhow::Context;
+use anyhow::Error;
+use anyhow::Result;
+use anyhow::anyhow;
+use itertools::Itertools;
+
+mod config;
+mod cli;
+mod schema;
+
+use fss::fss_client::FssClient;
+use fss::VersionRequest;
+use fss::IndexFileRequest;
+use fss::SearchQueryRequest;
+
+mod fss {
+ tonic::include_proto!("fss");
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let cli = crate::cli::client_app();
+ let matches = cli.get_matches();
+ let _ = env_logger::try_init()?;
+ let mut config = ::config::Config::default();
+ {
+ let xdg = xdg::BaseDirectories::with_prefix("fss")?;
+ let xdg_config = xdg.find_config_file("config.toml")
+ .ok_or_else(|| anyhow!("No configuration file found with XDG: {}", xdg.get_config_home().display()))?;
+
+ log::debug!("Configuration file found with XDG: {}", xdg_config.display());
+ config.merge(::config::File::from(xdg_config).required(false))
+ .context("Failed to load config.toml from XDG configuration directory")?;
+ }
+ let config = config.try_into::<crate::config::Config>()?;
+ let server_addr = matches.value_of("server_addr").unwrap_or_else(|| config.server_addr());
+ let server_port = matches.value_of("server_port").map(usize::from_str).unwrap_or_else(|| Ok(*config.server_port()))?;
+ let addr = format!("http://{}:{}", server_addr, server_port);
+
+ let mut client = FssClient::connect(addr).await?;
+ let request = tonic::Request::new(VersionRequest { });
+ let _response = client.get_version(request).await?;
+
+ match matches.subcommand() {
+ ("index", Some(mtch)) => {
+ let path = mtch.value_of("file").map(String::from).unwrap(); // safe by clap
+ let request = tonic::Request::new(IndexFileRequest { path });
+ let response = client.index_file(request).await?;
+
+ if response.get_ref().error {
+ Err(anyhow!("Error indexing file"))
+ } else {
+ Ok(())
+ }
+ },
+
+ ("search", Some(mtch)) => {
+ let query = mtch.values_of("term")
+ .unwrap() // safe by clap
+ .join(" ");
+
+ let request = tonic::Request::new(SearchQueryRequest { query });
+ let response = client.search_query(request).await?;
+ let mut output = std::io::stdout();
+
+ if response.get_ref().success {
+ response.get_ref()
+ .pathes
+ .iter()
+ .map(|pb| writeln!(output, "{}", pb).map_err(Error::from))
+ .collect::<Result<Vec<_>>>()
+ .map(|_| ())
+ } else {
+ Err(anyhow!("Failed to search"))
+ }
+ },
+
+ (_other, _) => {
+ unimplemented!()
+ },
+ }
+}
diff --git a/src/config.rs b/src/config.rs
index 27dddb8..6bb83a1 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -3,6 +3,12 @@ use std::path::PathBuf;
#[derive(Debug, serde::Deserialize, getset::Getters)]
pub struct Config {
#[getset(get = "pub")]
- database_path: PathBuf
+ database_path: PathBuf,
+
+ #[getset(get = "pub")]
+ server_addr: String,
+
+ #[getset(get = "pub")]
+ server_port: usize,
}
diff --git a/src/main.rs b/src/main.rs
deleted file mode 100644
index 8fd5fb2..0000000
--- a/src/main.rs
+++ /dev/null
@@ -1,107 +0,0 @@
-use std::io::Write;
-use std::path::PathBuf;
-
-use anyhow::Context;
-use anyhow::Error;
-use anyhow::Result;
-use anyhow::anyhow;
-use itertools::Itertools;
-
-mod config;
-mod cli;
-mod schema;
-
-fn main() -> Result<()> {
- let cli = crate::cli::app();
- let _ = env_logger::try_init()?;
- let mut config = ::config::Config::default();
- {
- let xdg = xdg::BaseDirectories::with_prefix("fss")?;
- let xdg_config = xdg.find_config_file("config.toml")
- .ok_or_else(|| anyhow!("No configuration file found with XDG: {}", xdg.get_config_home().display()))?;
-
- log::debug!("Configuration file found with XDG: {}", xdg_config.display());
- config.merge(::config::File::from(xdg_config).required(false))
- .context("Failed to load config.toml from XDG configuration directory")?;
- }
- let config = config.try_into::<crate::config::Config>()?;
-
- let index_path = tantivy::directory::MmapDirectory::open(config.database_path())?;
- let schema = crate::schema::schema();
-
- let index = tantivy::Index::open_or_create(index_path, schema.clone())?;
-
- let field_path = schema.get_field("path")
- .ok_or_else(|| anyhow!("BUG"))?;
- let field_ft = schema.get_field("ft")
- .ok_or_else(|| anyhow!("BUG"))?;
- let field_body = schema.get_field("body")
- .ok_or_else(|| anyhow!("BUG"))?;
-
- match cli.get_matches().subcommand() {
- ("index", Some(mtch)) => {
- let mut index_writer = index.writer(50_000_000)?;
- mtch.values_of("file")
- .unwrap() // safe by clap
- .map(|filepath| {
- let path_str = String::from(filepath);
- let path = PathBuf::from(&path_str);
-
- let filetype = path.extension()
- .map(ToOwned::to_owned)
- .and_then(|osstr| osstr.to_str().map(|s| s.to_string()))
- .ok_or_else(|| anyhow!("Path {} is not UTF8", filepath))?;
-
- let mut doc = tantivy::Document::default();
- doc.add_text(field_path, &path_str);
- doc.add_text(field_ft, &filetype);
-
- doc.add_text(field_body, std::fs::read_to_string(path)?);
-
- index_writer.add_document(doc);
- Ok(())
- })
- .collect::<Result<Vec<_>>>()?;
-
- index_writer.commit()?;
- Ok(())
- },
-
- ("search", Some(mtch)) => {
- let query_str = mtch.values_of("term")
- .unwrap() // safe by clap
- .join(" ");
-
- let reader = index
- .reader_builder()
- .reload_policy(tantivy::ReloadPolicy::OnCommit)
- .try_into()?;
-
- let searcher = reader.searcher();
- let query_parser = tantivy::query::QueryParser::for_index(&index, vec![field_path.clone(), field_ft, field_body]);
- let query = query_parser.parse_query(&query_str)?;
-
- let top_docs = searcher.search(&query, &tantivy::collector::TopDocs::with_limit(10))?;
- let mut output = std::io::stdout();
-
- top_docs.into_iter()
- .map(|(_score, adr)| {
- let retrieved_doc = searcher.doc(adr)?;
- retrieved_doc.get_all(field_path)
- .map(|value| {
- value.text().ok_or_else(|| anyhow!("Not a text value.."))
- })
- .map_ok(|txt| {
- writeln!(output, "{}", txt).map_err(Error::from)
- })
- .collect::<Result<Vec<_>>>()
- })
- .collect::<Result<Vec<Vec<_>>>>()
- .map(|_| ())
- },
-
- (_other, _) => {
- unimplemented!()
- },
- }
-}
diff --git a/src/server.rs b/src/server.rs
new file mode 100644
index 0000000..bb7a550
--- /dev/null
+++ b/src/server.rs
@@ -0,0 +1,179 @@
+use std::str::FromStr;
+use std::path::PathBuf;
+
+use anyhow::anyhow;
+use anyhow::Error;
+use anyhow::Context;
+use anyhow::Result;
+use tonic::Request;
+use tonic::Response;
+use tonic::Status;
+use resiter::AndThen;
+use resiter::Map;
+
+use fss::VersionReply;
+use fss::VersionRequest;
+use fss::IndexFileRequest;
+use fss::IndexFileReply;
+use fss::SearchQueryRequest;
+use fss::SearchResponse;
+
+mod config;
+mod cli;
+mod schema;
+
+mod fss {
+ tonic::include_proto!("fss"); // The string specified here must match the proto package name
+}
+
+pub struct Server {
+ index: tantivy::Index,
+
+ field_path: tantivy::schema::Field,
+ field_ft: tantivy::schema::Field,
+ field_body: tantivy::schema::Field,
+}
+
+impl Server {
+ fn write_file_to_index(&self, filepath: &str) -> Result<()> {
+ let mut index_writer = self.index.writer(50_000_000)?;
+ let path = PathBuf::from(filepath);
+
+ let filetype = path.extension()
+ .map(ToOwned::to_owned)
+ .and_then(|osstr| osstr.to_str().map(|s| s.to_string()))
+ .ok_or_else(|| anyhow!("Path {} is not UTF8", filepath))?;
+
+ let mut doc = tantivy::Document::default();
+ doc.add_text(self.field_path, filepath);
+ doc.add_text(self.field_ft, &filetype);
+
+ doc.add_text(self.field_body, std::fs::read_to_string(path)?);
+
+ index_writer.add_document(doc);
+ index_writer.commit()?;
+ Ok(())
+ }
+
+ fn search(&self, query_str: &str) -> Result<Vec<PathBuf>> {
+ let reader = self.index
+ .reader_builder()
+ .reload_policy(tantivy::ReloadPolicy::OnCommit)
+ .try_into()?;
+
+ let searcher = reader.searcher();
+ let query_parser = tantivy::query::QueryParser::for_index(&self.index, vec![self.field_path, self.field_ft, self.field_body]);
+ let query = query_parser.parse_query(query_str)?;
+
+ let top_docs = searcher.search(&query, &tantivy::collector::TopDocs::with_limit(10))?;
+
+ let results = top_docs
+ .into_iter()
+ .map(|(_score, adr)| searcher.doc(adr).map_err(Error::from))
+ .and_then_ok(|retr| {
+ retr.get_all(self.field_path)
+ .map(|value| {
+ value.text().ok_or_else(|| anyhow!("Not a text value.."))
+ })
+ .map_ok(PathBuf::from)
+ .collect::<Result<Vec<_>>>()
+ })
+ .collect::<Result<Vec<Vec<PathBuf>>>>()?
+ .into_iter()
+ .map(Vec::into_iter)
+ .flatten()
+ .collect();
+ Ok(results)
+ }
+}
+
+
+#[tonic::async_trait]
+impl fss::fss_server::Fss for Server {
+ async fn get_version(&self, _request: Request<VersionRequest>) -> Result<Response<VersionReply>, Status> {
+ let reply = fss::VersionReply {
+ version: version::version!().to_string(),
+ };
+
+ Ok(Response::new(reply))
+ }
+
+ async fn index_file(&self, request: Request<IndexFileRequest>) -> Result<Response<IndexFileReply>, Status> {
+ let error = match self.write_file_to_index(&request.get_ref().path) {
+ Ok(()) => false,
+ Err(e) => {
+ log::error!("Error writing to index: {} -> {:?}", request.get_ref().path, e);
+ true
+ }
+ };
+
+ let reply = fss::IndexFileReply {
+ error
+ };
+ Ok(Response::new(reply))
+ }
+
+ async fn search_query(&self, request: Request<SearchQueryRequest>) -> Result<Response<SearchResponse>, Status> {
+ let (success, pathes) = match self.search(&request.get_ref().query) {
+ Ok(pathes) => (true, pathes),
+ Err(e) => {
+ log::error!("Error searching with '{}' -> {:?}", request.get_ref().query, e);
+ (false, vec![])
+ }
+ };
+
+ let reply = fss::SearchResponse {
+ success,
+ pathes: pathes.into_iter().map(|pb| pb.display().to_string()).collect(),
+ };
+ Ok(Response::new(reply))
+ }
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let cli = crate::cli::server_app();
+ let matches = cli.get_matches();
+ let _ = env_logger::try_init()?;
+ let mut config = ::config::Config::default();
+ {
+ let xdg = xdg::BaseDirectories::with_prefix("fss")?;
+ let xdg_config = xdg.find_config_file("config.toml")
+ .ok_or_else(|| anyhow!("No configuration file found with XDG: {}", xdg.get_config_home().display()))?;
+
+ log::debug!("Configuration file found with XDG: {}", xdg_config.display());
+ config.merge(::config::File::from(xdg_config).required(false))
+ .context("Failed to load config.toml from XDG configuration directory")?;
+ }
+ let config = config.try_into::<crate::config::Config>()?;
+ let server_addr = matches.value_of("server_addr").unwrap_or_else(|| config.server_addr());
+ let server_port = matches.value_of("server_port").map(usize::from_str).unwrap_or_else(|| Ok(*config.server_port()))?;
+
+ let addr = format!("{}:{}", server_addr, server_port).parse()?;
+
+ let server = {
+ let index_path = tantivy::directory::MmapDirectory::open(config.database_path())?;
+ let schema = crate::schema::schema();
+
+ let index = tantivy::Index::open_or_create(index_path, schema.clone())?;
+ let field_path = schema.get_field("path").ok_or_else(|| anyhow!("BUG"))?;
+ let field_ft = schema.get_field("ft").ok_or_else(|| anyhow!("BUG"))?;
+ let field_body = schema.get_field("body").ok_or_else(|| anyhow!("BUG"))?;
+
+ Server {
+ index,
+
+ field_path,
+ field_ft,
+ field_body,
+ }
+ };
+
+ tonic::transport::Server::builder()
+ .add_service(fss::fss_server::FssServer::new(server))
+ .serve(addr)
+ .await
+ .map_err(Error::from)
+ .map(|_| ())
+}
+