diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-08-02 20:12:26 +0200 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-08-02 20:12:26 +0200 |
commit | b8763bfb38bbd9eef607d6a4f96d0a35b25923ae (patch) | |
tree | b15fd505bedb08e934ecb6bf10c6a66021369329 | |
parent | fe899ba6660ce3e0f1222ff2b16f1adc448cfd45 (diff) |
Let worker open the repo, let main() spawn the workers
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | Cargo.toml | 2 | ||||
-rw-r--r-- | src/main.rs | 37 | ||||
-rw-r--r-- | src/model/backend/worker.rs | 12 | ||||
-rw-r--r-- | src/model/repo.rs | 17 |
4 files changed, 36 insertions, 32 deletions
@@ -24,5 +24,5 @@ resiter = "0.3" result-inspect = "0.1" serde = { version = "1", features = ["derive"] } syntect = "4.5" -tokio = { version = "1", features = ["rt-multi-thread", "fs", "sync"] } +tokio = { version = "1", features = ["rt-multi-thread", "fs", "sync", "macros"] } typed-builder = "0.9" diff --git a/src/main.rs b/src/main.rs index 295b44b..8f501da 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ use actix_web::HttpServer; use actix_web::web; use actix_web::middleware::Logger; use resiter::Map; +use futures::stream::StreamExt; mod cli; mod controller; @@ -32,15 +33,20 @@ async fn app() -> Result<()> { let default_branch_name = cli.value_of("default-branch-name").unwrap(); // safe by clap let addr = format!("{}:{}", bind, port); - let repos = cli - .values_of("repo") - .unwrap() // safe by clap, we have at least one - .into_iter() - .map(crate::model::repo::Repo::open_repo) - .map_ok(|repostate| { - (repostate.name().clone(), Arc::new(Mutex::new(repostate))) - }) - .collect::<Result<_>>()?; + let (repos, workers) = { + let mut repos = std::collections::HashMap::new(); + let mut workers = Vec::new(); + + let repo_cli = cli.values_of("repo").unwrap(); // safe by clap, we have at least one + + for repo in repo_cli.into_iter() { + let (repostate, worker) = crate::model::repo::Repo::open_repo(repo)?; + repos.insert(repostate.name().clone(), Arc::new(Mutex::new(repostate))); + workers.push(worker); + } + + (repos, workers) + }; let handlebars = crate::view::setup_handlebars()?; @@ -57,7 +63,7 @@ async fn app() -> Result<()> { let app_state = Arc::new(app_state); - HttpServer::new(move || { + let server = HttpServer::new(move || { App::new() .wrap(Logger::default()) .wrap(Logger::new("%a %{User-Agent}i")) @@ -80,7 +86,12 @@ async fn app() -> Result<()> { ) }) .bind(addr)? - .run() - .await - .map_err(Error::from) + .run(); + + let workers = workers.into_iter() + .map(|worker| async { worker.run().await; }) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Vec<()>>(); + + tokio::join!(server, workers).0.map_err(Error::from) } diff --git a/src/model/backend/worker.rs b/src/model/backend/worker.rs index 8530ed6..33c32d8 100644 --- a/src/model/backend/worker.rs +++ b/src/model/backend/worker.rs @@ -1,9 +1,10 @@ -use tokio::sync::Mutex; +use std::path::Path; use anyhow::anyhow; use anyhow::Result; use cached::TimedSizedCache; use cached::proc_macro::cached; +use tokio::sync::Mutex; use crate::model::backend::error::BackendError; use crate::model::backend::messages::BackendRequest; @@ -18,12 +19,9 @@ pub struct BackendWorker { } impl BackendWorker { - pub fn new( - repo_name: String, - repo: Mutex<git2::Repository>, - receiver: BackendRequestReceiver, - ) -> Self { - BackendWorker { repo_name, repo, receiver } + pub fn new<P: AsRef<Path>>(repo_name: String, repo_path: P, receiver: BackendRequestReceiver) -> Result<Self> { + let repo = git2::Repository::open(repo_path).map(tokio::sync::Mutex::new)?; + Ok(BackendWorker { repo_name, repo, receiver }) } pub async fn run(mut self) { diff --git a/src/model/repo.rs b/src/model/repo.rs index edbc696..499c122 100644 --- a/src/model/repo.rs +++ b/src/model/repo.rs @@ -16,25 +16,20 @@ pub struct Repo { } impl Repo { - pub fn open_repo<P: AsRef<Path>>(path: P) -> Result<Self> { - let name = path.as_ref() + pub fn open_repo<P: AsRef<Path>>(repo_path: P) -> Result<(Self, BackendWorker)> { + let name = repo_path.as_ref() .file_name() .and_then(std::ffi::OsStr::to_str) .map(String::from) - .ok_or_else(|| anyhow!("Not valid UTF-8, cannot process: {}", path.as_ref().display()))?; + .ok_or_else(|| anyhow!("Not valid UTF-8, cannot process: {}", repo_path.as_ref().display()))?; - log::info!("Opening {} = {}", name, path.as_ref().display()); - let repo = git2::Repository::open(&path)?; + log::info!("Opening {} = {}", name, repo_path.as_ref().display()); let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); - let worker = BackendWorker::new(name.clone(), Mutex::new(repo), receiver); + let worker = BackendWorker::new(name.clone(), repo_path, receiver)?; let backend = BackendFassade::new(sender); - std::thread::spawn(move || { - worker.run() - }); - - Ok(Repo { name, backend }) + Ok((Repo { name, backend }, worker)) } pub async fn stat(&self) -> Result<cached::Return<RepositoryStat>> { |