summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-08-02 20:12:26 +0200
committerMatthias Beyer <mail@beyermatthias.de>2021-08-02 20:12:26 +0200
commitb8763bfb38bbd9eef607d6a4f96d0a35b25923ae (patch)
treeb15fd505bedb08e934ecb6bf10c6a66021369329
parentfe899ba6660ce3e0f1222ff2b16f1adc448cfd45 (diff)
Let worker open the repo, let main() spawn the workers
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--Cargo.toml2
-rw-r--r--src/main.rs37
-rw-r--r--src/model/backend/worker.rs12
-rw-r--r--src/model/repo.rs17
4 files changed, 36 insertions, 32 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 7b77526..e0b546a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -24,5 +24,5 @@ resiter = "0.3"
result-inspect = "0.1"
serde = { version = "1", features = ["derive"] }
syntect = "4.5"
-tokio = { version = "1", features = ["rt-multi-thread", "fs", "sync"] }
+tokio = { version = "1", features = ["rt-multi-thread", "fs", "sync", "macros"] }
typed-builder = "0.9"
diff --git a/src/main.rs b/src/main.rs
index 295b44b..8f501da 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -9,6 +9,7 @@ use actix_web::HttpServer;
use actix_web::web;
use actix_web::middleware::Logger;
use resiter::Map;
+use futures::stream::StreamExt;
mod cli;
mod controller;
@@ -32,15 +33,20 @@ async fn app() -> Result<()> {
let default_branch_name = cli.value_of("default-branch-name").unwrap(); // safe by clap
let addr = format!("{}:{}", bind, port);
- let repos = cli
- .values_of("repo")
- .unwrap() // safe by clap, we have at least one
- .into_iter()
- .map(crate::model::repo::Repo::open_repo)
- .map_ok(|repostate| {
- (repostate.name().clone(), Arc::new(Mutex::new(repostate)))
- })
- .collect::<Result<_>>()?;
+ let (repos, workers) = {
+ let mut repos = std::collections::HashMap::new();
+ let mut workers = Vec::new();
+
+ let repo_cli = cli.values_of("repo").unwrap(); // safe by clap, we have at least one
+
+ for repo in repo_cli.into_iter() {
+ let (repostate, worker) = crate::model::repo::Repo::open_repo(repo)?;
+ repos.insert(repostate.name().clone(), Arc::new(Mutex::new(repostate)));
+ workers.push(worker);
+ }
+
+ (repos, workers)
+ };
let handlebars = crate::view::setup_handlebars()?;
@@ -57,7 +63,7 @@ async fn app() -> Result<()> {
let app_state = Arc::new(app_state);
- HttpServer::new(move || {
+ let server = HttpServer::new(move || {
App::new()
.wrap(Logger::default())
.wrap(Logger::new("%a %{User-Agent}i"))
@@ -80,7 +86,12 @@ async fn app() -> Result<()> {
)
})
.bind(addr)?
- .run()
- .await
- .map_err(Error::from)
+ .run();
+
+ let workers = workers.into_iter()
+ .map(|worker| async { worker.run().await; })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Vec<()>>();
+
+ tokio::join!(server, workers).0.map_err(Error::from)
}
diff --git a/src/model/backend/worker.rs b/src/model/backend/worker.rs
index 8530ed6..33c32d8 100644
--- a/src/model/backend/worker.rs
+++ b/src/model/backend/worker.rs
@@ -1,9 +1,10 @@
-use tokio::sync::Mutex;
+use std::path::Path;
use anyhow::anyhow;
use anyhow::Result;
use cached::TimedSizedCache;
use cached::proc_macro::cached;
+use tokio::sync::Mutex;
use crate::model::backend::error::BackendError;
use crate::model::backend::messages::BackendRequest;
@@ -18,12 +19,9 @@ pub struct BackendWorker {
}
impl BackendWorker {
- pub fn new(
- repo_name: String,
- repo: Mutex<git2::Repository>,
- receiver: BackendRequestReceiver,
- ) -> Self {
- BackendWorker { repo_name, repo, receiver }
+ pub fn new<P: AsRef<Path>>(repo_name: String, repo_path: P, receiver: BackendRequestReceiver) -> Result<Self> {
+ let repo = git2::Repository::open(repo_path).map(tokio::sync::Mutex::new)?;
+ Ok(BackendWorker { repo_name, repo, receiver })
}
pub async fn run(mut self) {
diff --git a/src/model/repo.rs b/src/model/repo.rs
index edbc696..499c122 100644
--- a/src/model/repo.rs
+++ b/src/model/repo.rs
@@ -16,25 +16,20 @@ pub struct Repo {
}
impl Repo {
- pub fn open_repo<P: AsRef<Path>>(path: P) -> Result<Self> {
- let name = path.as_ref()
+ pub fn open_repo<P: AsRef<Path>>(repo_path: P) -> Result<(Self, BackendWorker)> {
+ let name = repo_path.as_ref()
.file_name()
.and_then(std::ffi::OsStr::to_str)
.map(String::from)
- .ok_or_else(|| anyhow!("Not valid UTF-8, cannot process: {}", path.as_ref().display()))?;
+ .ok_or_else(|| anyhow!("Not valid UTF-8, cannot process: {}", repo_path.as_ref().display()))?;
- log::info!("Opening {} = {}", name, path.as_ref().display());
- let repo = git2::Repository::open(&path)?;
+ log::info!("Opening {} = {}", name, repo_path.as_ref().display());
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
- let worker = BackendWorker::new(name.clone(), Mutex::new(repo), receiver);
+ let worker = BackendWorker::new(name.clone(), repo_path, receiver)?;
let backend = BackendFassade::new(sender);
- std::thread::spawn(move || {
- worker.run()
- });
-
- Ok(Repo { name, backend })
+ Ok((Repo { name, backend }, worker))
}
pub async fn stat(&self) -> Result<cached::Return<RepositoryStat>> {