diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-08-01 16:54:11 +0200 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-08-01 17:12:14 +0200 |
commit | 5e4c632f4f9b6e8bbaf075d4fc5347bf5a7dd2b4 (patch) | |
tree | ab0a67988413a6c7088ebaaab698b10812762745 | |
parent | 66e2f509c4c0df8511123887e9eaa5faa746fef5 (diff) |
Abstract backend away to worker thread
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | Cargo.toml | 3 | ||||
-rw-r--r-- | src/model/backend/error.rs | 32 | ||||
-rw-r--r-- | src/model/backend/fassade.rs | 38 | ||||
-rw-r--r-- | src/model/backend/messages.rs | 16 | ||||
-rw-r--r-- | src/model/backend/mod.rs | 4 | ||||
-rw-r--r-- | src/model/backend/worker.rs | 105 | ||||
-rw-r--r-- | src/model/mod.rs | 2 | ||||
-rw-r--r-- | src/model/repo.rs | 73 |
8 files changed, 223 insertions, 50 deletions
@@ -10,6 +10,7 @@ actix-web = "3" anyhow = "1" cached = "0.23" clap = "3.0.0-beta.2" +crossbeam = "0.8" derive_more = "0.99" either = "1.6" env_logger = "0.8" @@ -20,5 +21,5 @@ handlebars = "4.1" log = "0.4" resiter = "0.3" serde = { version = "1", features = ["derive"] } -typed-builder = "0.9" syntect = "4.5" +typed-builder = "0.9" diff --git a/src/model/backend/error.rs b/src/model/backend/error.rs new file mode 100644 index 0000000..85f102b --- /dev/null +++ b/src/model/backend/error.rs @@ -0,0 +1,32 @@ +use derive_more::Display; +use derive_more::Error; +use actix_web::http::StatusCode; +use actix_web::HttpResponse; +use actix_web::dev::HttpResponseBuilder; +use actix_web::http::header; + +#[derive(Debug, Display, Error)] +pub enum BackendError { + #[display(fmt = "{}", _0)] + #[error(ignore)] + Str(String), + + #[display(fmt = "Backend Error: Lock poisoned")] + LockError, + + #[display(fmt = "Backend Error: IPC failed")] + ChannelError, +} + +impl actix_web::error::ResponseError for BackendError { + fn error_response(&self) -> HttpResponse { + HttpResponseBuilder::new(self.status_code()) + .set_header(header::CONTENT_TYPE, "text/html; charset=utf-8") + .body(self.to_string()) + } + + fn status_code(&self) -> StatusCode { + StatusCode::from_u16(500).unwrap() + } +} + diff --git a/src/model/backend/fassade.rs b/src/model/backend/fassade.rs new file mode 100644 index 0000000..23b3bf9 --- /dev/null +++ b/src/model/backend/fassade.rs @@ -0,0 +1,38 @@ +use anyhow::Result; +use anyhow::Error; + +use crate::model::backend::messages::BackendRequest; +use crate::model::backend::messages::BackendResponse; + +pub struct BackendFassade { + inner: crossbeam::channel::Sender<(BackendRequest, crossbeam::channel::Sender<BackendResponse>)>, +} + +impl BackendFassade { + pub fn new(inner: crossbeam::channel::Sender<(BackendRequest, crossbeam::channel::Sender<BackendResponse>)>) -> Self { + BackendFassade { inner } + } + + async fn send_request(&self, req: BackendRequest) -> Result<crossbeam::channel::Receiver<BackendResponse>> { + let (sender, receiver) = crossbeam::channel::bounded(1); + self.inner + .send(req.with_sender(sender)) + .map_err(Error::from) + .map(|_| receiver) + } + + pub async fn get_branch_list(&self) -> Result<Vec<String>> { + match self.send_request(BackendRequest::BranchList).await?.recv()? { + BackendResponse::BranchList(list) => Ok(list), + _ => todo!() + } + } + + pub async fn get_tag_list(&self) -> Result<Vec<String>> { + match self.send_request(BackendRequest::TagList).await?.recv()? { + BackendResponse::TagList(list) => Ok(list), + _ => todo!() + } + } +} + diff --git a/src/model/backend/messages.rs b/src/model/backend/messages.rs new file mode 100644 index 0000000..3e185f4 --- /dev/null +++ b/src/model/backend/messages.rs @@ -0,0 +1,16 @@ +pub enum BackendRequest { + BranchList, + TagList, +} + +impl BackendRequest { + pub fn with_sender(self, sender: crossbeam::channel::Sender<BackendResponse>) -> (Self, crossbeam::channel::Sender<BackendResponse>) { + (self, sender) + } +} + +pub enum BackendResponse { + BranchList(Vec<String>), + TagList(Vec<String>), +} + diff --git a/src/model/backend/mod.rs b/src/model/backend/mod.rs new file mode 100644 index 0000000..77f8e56 --- /dev/null +++ b/src/model/backend/mod.rs @@ -0,0 +1,4 @@ +pub mod error; +pub mod fassade; +pub mod messages; +pub mod worker; diff --git a/src/model/backend/worker.rs b/src/model/backend/worker.rs new file mode 100644 index 0000000..ad8d0de --- /dev/null +++ b/src/model/backend/worker.rs @@ -0,0 +1,105 @@ +use std::sync::Mutex; + +use anyhow::anyhow; +use anyhow::Result; +use cached::Return; +use cached::TimedSizedCache; +use cached::proc_macro::cached; + +use crate::model::backend::error::BackendError; +use crate::model::backend::messages::BackendRequest; +use crate::model::backend::messages::BackendResponse; + +pub struct BackendWorker { + repo_name: String, + repo: Mutex<git2::Repository>, + receiver: crossbeam::channel::Receiver<(BackendRequest, crossbeam::channel::Sender<BackendResponse>)>, +} + +impl BackendWorker { + pub fn new( + repo_name: String, + repo: Mutex<git2::Repository>, + receiver: crossbeam::channel::Receiver<(BackendRequest, crossbeam::channel::Sender<BackendResponse>)>) + -> Self { + BackendWorker { repo_name, repo, receiver } + } + + pub fn run(self) { + loop { + if let Err(e) = self.handle() { + log::error!("Error in backend: {}", e); + } + } + } + + fn handle(&self) -> std::result::Result<(), BackendError> { + match self.receiver.recv().map_err(|_| BackendError::ChannelError)? { + (BackendRequest::BranchList, chnl) => self.send_branch_list(chnl), + (BackendRequest::TagList, chnl) => self.send_tag_list(chnl), + } + } + + fn send_branch_list(&self, sender: crossbeam::channel::Sender<BackendResponse>) -> std::result::Result<(), BackendError> { + let repo_lock = self.repo + .lock() + .map_err(|_| BackendError::LockError)?; + + let branch_names = get_branch_names(&self.repo_name, &*repo_lock) + .map_err(|e| BackendError::Str(e.to_string()))?; + + sender.send(BackendResponse::BranchList(branch_names.value)) + .map_err(|_| BackendError::ChannelError) + } + + fn send_tag_list(&self, sender: crossbeam::channel::Sender<BackendResponse>) -> std::result::Result<(), BackendError> { + let repo_lock = self.repo + .lock() + .map_err(|_| BackendError::LockError)?; + + let tag_names = get_tag_names(&self.repo_name, &*repo_lock) + .map_err(|e| BackendError::Str(e.to_string()))?; + + sender.send(BackendResponse::TagList(tag_names.value)) + .map_err(|_| BackendError::ChannelError) + } +} + +#[cached( + type = "TimedSizedCache<String, Return<Vec<String>>>", + create = "{ TimedSizedCache::with_size_and_lifespan(1, 60) }", + convert = r#"{ _repo_name.to_owned() }"#, + with_cached_flag = true, + result = true, +)] +fn get_branch_names(_repo_name: &str, repo: &git2::Repository) -> Result<cached::Return<Vec<String>>> { + repo.branches(None)? + .map(|branch| { + let branch = branch?.0; + branch.name()? + .ok_or_else(|| anyhow!("Branch name is not valid UTF8: {:?}", branch.name())) + .map(String::from) + }) + .collect::<Result<Vec<String>>>() + .map(cached::Return::new) +} + +#[cached( + type = "TimedSizedCache<String, Return<Vec<String>>>", + create = "{ TimedSizedCache::with_size_and_lifespan(1, 60) }", + convert = r#"{ _repo_name.to_owned() }"#, + with_cached_flag = true, + result = true, +)] +fn get_tag_names(_repo_name: &str, repo: &git2::Repository) -> Result<cached::Return<Vec<String>>> { + repo.tag_names(None)? + .into_iter() + .map(|tag| { + tag.ok_or_else(|| anyhow!("Tag name is not valid UTF8: {:?}", tag)) + .map(String::from) + }) + .collect::<Result<Vec<String>>>() + .map(cached::Return::new) +} + + diff --git a/src/model/mod.rs b/src/model/mod.rs index 6a15c59..d21e2b1 100644 --- a/src/model/mod.rs +++ b/src/model/mod.rs @@ -1,2 +1,4 @@ +mod backend; + pub mod repo; pub mod state; diff --git a/src/model/repo.rs b/src/model/repo.rs index 2a33280..a656bac 100644 --- a/src/model/repo.rs +++ b/src/model/repo.rs @@ -1,10 +1,11 @@ use std::path::Path; +use std::sync::Mutex; use anyhow::Result; use anyhow::anyhow; -use cached::Return; -use cached::TimedSizedCache; -use cached::proc_macro::cached; + +use crate::model::backend::worker::BackendWorker; +use crate::model::backend::fassade::BackendFassade; #[derive(getset::Getters)] pub struct Repo { @@ -13,6 +14,8 @@ pub struct Repo { #[getset(get = "pub")] repo: git2::Repository, + + backend: BackendFassade, } impl Repo { @@ -24,70 +27,42 @@ impl Repo { .ok_or_else(|| anyhow!("Not valid UTF-8, cannot process: {}", path.as_ref().display()))?; log::info!("Opening {} = {}", name, path.as_ref().display()); - let repo = git2::Repository::open(path)?; + let repo = git2::Repository::open(&path)?; + let repo2 = git2::Repository::open(&path)?; // TODO: Remove duplicated open() + + let (sender, receiver) = crossbeam::channel::unbounded(); + let worker = BackendWorker::new(name.clone(), Mutex::new(repo2), receiver); + let backend = BackendFassade::new(sender); - Ok(Repo { name, repo }) + std::thread::spawn(move || { + worker.run() + }); + + Ok(Repo { name, repo, backend }) } pub async fn stat(&self) -> Result<cached::Return<RepositoryStat>> { let (branches, tags) = futures::try_join!(self.branch_names(), self.tag_names())?; Ok(cached::Return::new(RepositoryStat { - branches: branches.value, - tags: tags.value, + branches: branches, + tags: tags, })) } - pub async fn branch_names(&self) -> Result<cached::Return<Vec<String>>> { - get_branch_names(&self.name, &self.repo) + pub async fn branch_names(&self) -> Result<Vec<String>> { + self.backend.get_branch_list().await } - pub async fn tag_names(&self) -> Result<cached::Return<Vec<String>>> { - get_tag_names(&self.name, &self.repo) + pub async fn tag_names(&self) -> Result<Vec<String>> { + self.backend.get_tag_list().await } } + #[derive(Clone)] pub struct RepositoryStat { pub branches: Vec<String>, pub tags: Vec<String>, } -#[cached( - type = "TimedSizedCache<String, Return<Vec<String>>>", - create = "{ TimedSizedCache::with_size_and_lifespan(1, 60) }", - convert = r#"{ _repo_name.to_owned() }"#, - with_cached_flag = true, - result = true, -)] -fn get_branch_names(_repo_name: &str, repo: &git2::Repository) -> Result<cached::Return<Vec<String>>> { - repo.branches(None)? - .map(|branch| { - let branch = branch?.0; - branch.name()? - .ok_or_else(|| anyhow!("Branch name is not valid UTF8: {:?}", branch.name())) - .map(String::from) - }) - .collect::<Result<Vec<String>>>() - .map(cached::Return::new) -} - -#[cached( - type = "TimedSizedCache<String, Return<Vec<String>>>", - create = "{ TimedSizedCache::with_size_and_lifespan(1, 60) }", - convert = r#"{ _repo_name.to_owned() }"#, - with_cached_flag = true, - result = true, -)] -fn get_tag_names(_repo_name: &str, repo: &git2::Repository) -> Result<cached::Return<Vec<String>>> { - repo.tag_names(None)? - .into_iter() - .map(|tag| { - tag.ok_or_else(|| anyhow!("Tag name is not valid UTF8: {:?}", tag)) - .map(String::from) - }) - .collect::<Result<Vec<String>>>() - .map(cached::Return::new) -} - - |