summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-08-01 16:54:11 +0200
committerMatthias Beyer <mail@beyermatthias.de>2021-08-01 17:12:14 +0200
commit5e4c632f4f9b6e8bbaf075d4fc5347bf5a7dd2b4 (patch)
treeab0a67988413a6c7088ebaaab698b10812762745
parent66e2f509c4c0df8511123887e9eaa5faa746fef5 (diff)
Abstract backend away to worker thread
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--Cargo.toml3
-rw-r--r--src/model/backend/error.rs32
-rw-r--r--src/model/backend/fassade.rs38
-rw-r--r--src/model/backend/messages.rs16
-rw-r--r--src/model/backend/mod.rs4
-rw-r--r--src/model/backend/worker.rs105
-rw-r--r--src/model/mod.rs2
-rw-r--r--src/model/repo.rs73
8 files changed, 223 insertions, 50 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 6192cd4..d96e0b2 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,6 +10,7 @@ actix-web = "3"
anyhow = "1"
cached = "0.23"
clap = "3.0.0-beta.2"
+crossbeam = "0.8"
derive_more = "0.99"
either = "1.6"
env_logger = "0.8"
@@ -20,5 +21,5 @@ handlebars = "4.1"
log = "0.4"
resiter = "0.3"
serde = { version = "1", features = ["derive"] }
-typed-builder = "0.9"
syntect = "4.5"
+typed-builder = "0.9"
diff --git a/src/model/backend/error.rs b/src/model/backend/error.rs
new file mode 100644
index 0000000..85f102b
--- /dev/null
+++ b/src/model/backend/error.rs
@@ -0,0 +1,32 @@
+use derive_more::Display;
+use derive_more::Error;
+use actix_web::http::StatusCode;
+use actix_web::HttpResponse;
+use actix_web::dev::HttpResponseBuilder;
+use actix_web::http::header;
+
+#[derive(Debug, Display, Error)]
+pub enum BackendError {
+ #[display(fmt = "{}", _0)]
+ #[error(ignore)]
+ Str(String),
+
+ #[display(fmt = "Backend Error: Lock poisoned")]
+ LockError,
+
+ #[display(fmt = "Backend Error: IPC failed")]
+ ChannelError,
+}
+
+impl actix_web::error::ResponseError for BackendError {
+ fn error_response(&self) -> HttpResponse {
+ HttpResponseBuilder::new(self.status_code())
+ .set_header(header::CONTENT_TYPE, "text/html; charset=utf-8")
+ .body(self.to_string())
+ }
+
+ fn status_code(&self) -> StatusCode {
+ StatusCode::from_u16(500).unwrap()
+ }
+}
+
diff --git a/src/model/backend/fassade.rs b/src/model/backend/fassade.rs
new file mode 100644
index 0000000..23b3bf9
--- /dev/null
+++ b/src/model/backend/fassade.rs
@@ -0,0 +1,38 @@
+use anyhow::Result;
+use anyhow::Error;
+
+use crate::model::backend::messages::BackendRequest;
+use crate::model::backend::messages::BackendResponse;
+
+pub struct BackendFassade {
+ inner: crossbeam::channel::Sender<(BackendRequest, crossbeam::channel::Sender<BackendResponse>)>,
+}
+
+impl BackendFassade {
+ pub fn new(inner: crossbeam::channel::Sender<(BackendRequest, crossbeam::channel::Sender<BackendResponse>)>) -> Self {
+ BackendFassade { inner }
+ }
+
+ async fn send_request(&self, req: BackendRequest) -> Result<crossbeam::channel::Receiver<BackendResponse>> {
+ let (sender, receiver) = crossbeam::channel::bounded(1);
+ self.inner
+ .send(req.with_sender(sender))
+ .map_err(Error::from)
+ .map(|_| receiver)
+ }
+
+ pub async fn get_branch_list(&self) -> Result<Vec<String>> {
+ match self.send_request(BackendRequest::BranchList).await?.recv()? {
+ BackendResponse::BranchList(list) => Ok(list),
+ _ => todo!()
+ }
+ }
+
+ pub async fn get_tag_list(&self) -> Result<Vec<String>> {
+ match self.send_request(BackendRequest::TagList).await?.recv()? {
+ BackendResponse::TagList(list) => Ok(list),
+ _ => todo!()
+ }
+ }
+}
+
diff --git a/src/model/backend/messages.rs b/src/model/backend/messages.rs
new file mode 100644
index 0000000..3e185f4
--- /dev/null
+++ b/src/model/backend/messages.rs
@@ -0,0 +1,16 @@
+pub enum BackendRequest {
+ BranchList,
+ TagList,
+}
+
+impl BackendRequest {
+ pub fn with_sender(self, sender: crossbeam::channel::Sender<BackendResponse>) -> (Self, crossbeam::channel::Sender<BackendResponse>) {
+ (self, sender)
+ }
+}
+
+pub enum BackendResponse {
+ BranchList(Vec<String>),
+ TagList(Vec<String>),
+}
+
diff --git a/src/model/backend/mod.rs b/src/model/backend/mod.rs
new file mode 100644
index 0000000..77f8e56
--- /dev/null
+++ b/src/model/backend/mod.rs
@@ -0,0 +1,4 @@
+pub mod error;
+pub mod fassade;
+pub mod messages;
+pub mod worker;
diff --git a/src/model/backend/worker.rs b/src/model/backend/worker.rs
new file mode 100644
index 0000000..ad8d0de
--- /dev/null
+++ b/src/model/backend/worker.rs
@@ -0,0 +1,105 @@
+use std::sync::Mutex;
+
+use anyhow::anyhow;
+use anyhow::Result;
+use cached::Return;
+use cached::TimedSizedCache;
+use cached::proc_macro::cached;
+
+use crate::model::backend::error::BackendError;
+use crate::model::backend::messages::BackendRequest;
+use crate::model::backend::messages::BackendResponse;
+
+pub struct BackendWorker {
+ repo_name: String,
+ repo: Mutex<git2::Repository>,
+ receiver: crossbeam::channel::Receiver<(BackendRequest, crossbeam::channel::Sender<BackendResponse>)>,
+}
+
+impl BackendWorker {
+ pub fn new(
+ repo_name: String,
+ repo: Mutex<git2::Repository>,
+ receiver: crossbeam::channel::Receiver<(BackendRequest, crossbeam::channel::Sender<BackendResponse>)>)
+ -> Self {
+ BackendWorker { repo_name, repo, receiver }
+ }
+
+ pub fn run(self) {
+ loop {
+ if let Err(e) = self.handle() {
+ log::error!("Error in backend: {}", e);
+ }
+ }
+ }
+
+ fn handle(&self) -> std::result::Result<(), BackendError> {
+ match self.receiver.recv().map_err(|_| BackendError::ChannelError)? {
+ (BackendRequest::BranchList, chnl) => self.send_branch_list(chnl),
+ (BackendRequest::TagList, chnl) => self.send_tag_list(chnl),
+ }
+ }
+
+ fn send_branch_list(&self, sender: crossbeam::channel::Sender<BackendResponse>) -> std::result::Result<(), BackendError> {
+ let repo_lock = self.repo
+ .lock()
+ .map_err(|_| BackendError::LockError)?;
+
+ let branch_names = get_branch_names(&self.repo_name, &*repo_lock)
+ .map_err(|e| BackendError::Str(e.to_string()))?;
+
+ sender.send(BackendResponse::BranchList(branch_names.value))
+ .map_err(|_| BackendError::ChannelError)
+ }
+
+ fn send_tag_list(&self, sender: crossbeam::channel::Sender<BackendResponse>) -> std::result::Result<(), BackendError> {
+ let repo_lock = self.repo
+ .lock()
+ .map_err(|_| BackendError::LockError)?;
+
+ let tag_names = get_tag_names(&self.repo_name, &*repo_lock)
+ .map_err(|e| BackendError::Str(e.to_string()))?;
+
+ sender.send(BackendResponse::TagList(tag_names.value))
+ .map_err(|_| BackendError::ChannelError)
+ }
+}
+
+#[cached(
+ type = "TimedSizedCache<String, Return<Vec<String>>>",
+ create = "{ TimedSizedCache::with_size_and_lifespan(1, 60) }",
+ convert = r#"{ _repo_name.to_owned() }"#,
+ with_cached_flag = true,
+ result = true,
+)]
+fn get_branch_names(_repo_name: &str, repo: &git2::Repository) -> Result<cached::Return<Vec<String>>> {
+ repo.branches(None)?
+ .map(|branch| {
+ let branch = branch?.0;
+ branch.name()?
+ .ok_or_else(|| anyhow!("Branch name is not valid UTF8: {:?}", branch.name()))
+ .map(String::from)
+ })
+ .collect::<Result<Vec<String>>>()
+ .map(cached::Return::new)
+}
+
+#[cached(
+ type = "TimedSizedCache<String, Return<Vec<String>>>",
+ create = "{ TimedSizedCache::with_size_and_lifespan(1, 60) }",
+ convert = r#"{ _repo_name.to_owned() }"#,
+ with_cached_flag = true,
+ result = true,
+)]
+fn get_tag_names(_repo_name: &str, repo: &git2::Repository) -> Result<cached::Return<Vec<String>>> {
+ repo.tag_names(None)?
+ .into_iter()
+ .map(|tag| {
+ tag.ok_or_else(|| anyhow!("Tag name is not valid UTF8: {:?}", tag))
+ .map(String::from)
+ })
+ .collect::<Result<Vec<String>>>()
+ .map(cached::Return::new)
+}
+
+
diff --git a/src/model/mod.rs b/src/model/mod.rs
index 6a15c59..d21e2b1 100644
--- a/src/model/mod.rs
+++ b/src/model/mod.rs
@@ -1,2 +1,4 @@
+mod backend;
+
pub mod repo;
pub mod state;
diff --git a/src/model/repo.rs b/src/model/repo.rs
index 2a33280..a656bac 100644
--- a/src/model/repo.rs
+++ b/src/model/repo.rs
@@ -1,10 +1,11 @@
use std::path::Path;
+use std::sync::Mutex;
use anyhow::Result;
use anyhow::anyhow;
-use cached::Return;
-use cached::TimedSizedCache;
-use cached::proc_macro::cached;
+
+use crate::model::backend::worker::BackendWorker;
+use crate::model::backend::fassade::BackendFassade;
#[derive(getset::Getters)]
pub struct Repo {
@@ -13,6 +14,8 @@ pub struct Repo {
#[getset(get = "pub")]
repo: git2::Repository,
+
+ backend: BackendFassade,
}
impl Repo {
@@ -24,70 +27,42 @@ impl Repo {
.ok_or_else(|| anyhow!("Not valid UTF-8, cannot process: {}", path.as_ref().display()))?;
log::info!("Opening {} = {}", name, path.as_ref().display());
- let repo = git2::Repository::open(path)?;
+ let repo = git2::Repository::open(&path)?;
+ let repo2 = git2::Repository::open(&path)?; // TODO: Remove duplicated open()
+
+ let (sender, receiver) = crossbeam::channel::unbounded();
+ let worker = BackendWorker::new(name.clone(), Mutex::new(repo2), receiver);
+ let backend = BackendFassade::new(sender);
- Ok(Repo { name, repo })
+ std::thread::spawn(move || {
+ worker.run()
+ });
+
+ Ok(Repo { name, repo, backend })
}
pub async fn stat(&self) -> Result<cached::Return<RepositoryStat>> {
let (branches, tags) = futures::try_join!(self.branch_names(), self.tag_names())?;
Ok(cached::Return::new(RepositoryStat {
- branches: branches.value,
- tags: tags.value,
+ branches: branches,
+ tags: tags,
}))
}
- pub async fn branch_names(&self) -> Result<cached::Return<Vec<String>>> {
- get_branch_names(&self.name, &self.repo)
+ pub async fn branch_names(&self) -> Result<Vec<String>> {
+ self.backend.get_branch_list().await
}
- pub async fn tag_names(&self) -> Result<cached::Return<Vec<String>>> {
- get_tag_names(&self.name, &self.repo)
+ pub async fn tag_names(&self) -> Result<Vec<String>> {
+ self.backend.get_tag_list().await
}
}
+
#[derive(Clone)]
pub struct RepositoryStat {
pub branches: Vec<String>,
pub tags: Vec<String>,
}
-#[cached(
- type = "TimedSizedCache<String, Return<Vec<String>>>",
- create = "{ TimedSizedCache::with_size_and_lifespan(1, 60) }",
- convert = r#"{ _repo_name.to_owned() }"#,
- with_cached_flag = true,
- result = true,
-)]
-fn get_branch_names(_repo_name: &str, repo: &git2::Repository) -> Result<cached::Return<Vec<String>>> {
- repo.branches(None)?
- .map(|branch| {
- let branch = branch?.0;
- branch.name()?
- .ok_or_else(|| anyhow!("Branch name is not valid UTF8: {:?}", branch.name()))
- .map(String::from)
- })
- .collect::<Result<Vec<String>>>()
- .map(cached::Return::new)
-}
-
-#[cached(
- type = "TimedSizedCache<String, Return<Vec<String>>>",
- create = "{ TimedSizedCache::with_size_and_lifespan(1, 60) }",
- convert = r#"{ _repo_name.to_owned() }"#,
- with_cached_flag = true,
- result = true,
-)]
-fn get_tag_names(_repo_name: &str, repo: &git2::Repository) -> Result<cached::Return<Vec<String>>> {
- repo.tag_names(None)?
- .into_iter()
- .map(|tag| {
- tag.ok_or_else(|| anyhow!("Tag name is not valid UTF8: {:?}", tag))
- .map(String::from)
- })
- .collect::<Result<Vec<String>>>()
- .map(cached::Return::new)
-}
-
-