summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2021-08-02 19:57:14 +0200
committerMatthias Beyer <mail@beyermatthias.de>2021-08-02 19:57:14 +0200
commitf826c22e35c18bf0874f2479785e529995949a5c (patch)
treeedf60ee33cdcd56d056e8ef054f7f5e8faa55d58
parent2272c317becaa631be900a8dbb312998189b2221 (diff)
Rewrite backend implementation with more tokio asyncness
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r--Cargo.toml1
-rw-r--r--src/model/backend/channels.rs8
-rw-r--r--src/model/backend/error.rs3
-rw-r--r--src/model/backend/fassade.rs26
-rw-r--r--src/model/backend/worker.rs56
-rw-r--r--src/model/repo.rs4
6 files changed, 54 insertions, 44 deletions
diff --git a/Cargo.toml b/Cargo.toml
index ccd6512..7b77526 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -24,4 +24,5 @@ resiter = "0.3"
result-inspect = "0.1"
serde = { version = "1", features = ["derive"] }
syntect = "4.5"
+tokio = { version = "1", features = ["rt-multi-thread", "fs", "sync"] }
typed-builder = "0.9"
diff --git a/src/model/backend/channels.rs b/src/model/backend/channels.rs
index ce8c19f..4cb6c60 100644
--- a/src/model/backend/channels.rs
+++ b/src/model/backend/channels.rs
@@ -1,9 +1,9 @@
use crate::model::backend::messages::BackendRequest;
use crate::model::backend::messages::BackendResponse;
-pub type BackendRequestSender = crossbeam::channel::Sender<(BackendRequest, BackendResponseSender)>;
-pub type BackendRequestdReceiver = crossbeam::channel::Receiver<(BackendRequest, BackendResponseSender)>;
+pub type BackendRequestSender = tokio::sync::mpsc::UnboundedSender<(BackendRequest, BackendResponseSender)>;
+pub type BackendRequestdReceiver = tokio::sync::mpsc::UnboundedReceiver<(BackendRequest, BackendResponseSender)>;
-pub type BackendResponseSender = crossbeam::channel::Sender<BackendResponse>;
-pub type BackendResponseReceiver = crossbeam::channel::Receiver<BackendResponse>;
+pub type BackendResponseSender = tokio::sync::oneshot::Sender<BackendResponse>;
+pub type BackendResponseReceiver = tokio::sync::oneshot::Receiver<BackendResponse>;
diff --git a/src/model/backend/error.rs b/src/model/backend/error.rs
index bfca03e..2126807 100644
--- a/src/model/backend/error.rs
+++ b/src/model/backend/error.rs
@@ -17,6 +17,9 @@ pub enum BackendError {
#[display(fmt = "Backend Error: IPC failed")]
ChannelError,
+ #[display(fmt = "Backend Error: task spawning failed")]
+ TaskError,
+
#[display(fmt = "Backend Error: Finding branch {} failed", _0)]
#[error(ignore)]
FindBranchError(String),
diff --git a/src/model/backend/fassade.rs b/src/model/backend/fassade.rs
index 9c8d4cf..fae9772 100644
--- a/src/model/backend/fassade.rs
+++ b/src/model/backend/fassade.rs
@@ -20,51 +20,49 @@ impl BackendFassade {
BackendFassade { inner }
}
- async fn send_request(&self, req: BackendRequest) -> Result<BackendResponseReceiver> {
- let (sender, receiver) = crossbeam::channel::bounded(1);
- self.inner
- .send(req.with_sender(sender))
- .map_err(Error::from)
- .map(|_| receiver)
+ async fn send_request(&self, req: BackendRequest) -> Result<BackendResponse> {
+ let (sender, receiver) = tokio::sync::oneshot::channel();
+ let _ = self.inner.send(req.with_sender(sender))?;
+ receiver.await.map_err(Error::from)
}
pub async fn get_branch_list(&self) -> Result<Vec<String>> {
- match self.send_request(BackendRequest::BranchList).await?.recv()? {
+ match self.send_request(BackendRequest::BranchList).await? {
BackendResponse::BranchList(list) => Ok(list),
_ => todo!()
}
}
pub async fn get_tag_list(&self) -> Result<Vec<String>> {
- match self.send_request(BackendRequest::TagList).await?.recv()? {
+ match self.send_request(BackendRequest::TagList).await? {
BackendResponse::TagList(list) => Ok(list),
_ => todo!()
}
}
pub async fn find_branch(&self, name: &str) -> Result<Branch> {
- match self.send_request(BackendRequest::FindBranchId(name.to_string())).await?.recv()? {
+ match self.send_request(BackendRequest::FindBranchId(name.to_string())).await? {
BackendResponse::BranchId(oid) => Ok(crate::model::branch::Branch::new(self.clone(), name.to_string(), oid)),
_ => todo!()
}
}
pub async fn find_tree(&self, id: git2::Oid) -> Result<Tree> {
- match self.send_request(BackendRequest::FindTree(id)).await?.recv()? {
+ match self.send_request(BackendRequest::FindTree(id)).await? {
BackendResponse::TreeId(oid) => Ok(crate::model::tree::Tree::new(self.clone(), oid)),
_ => todo!()
}
}
pub async fn find_blob(&self, id: git2::Oid) -> Result<Blob> {
- match self.send_request(BackendRequest::FindBlob(id)).await?.recv()? {
+ match self.send_request(BackendRequest::FindBlob(id)).await? {
BackendResponse::BlobId(oid) => Ok(crate::model::blob::Blob::new(self.clone(), oid)),
_ => todo!()
}
}
pub async fn load_blob(&self, id: git2::Oid) -> Result<BlobWithContent> {
- match self.send_request(BackendRequest::LoadBlob(id)).await?.recv()? {
+ match self.send_request(BackendRequest::LoadBlob(id)).await? {
BackendResponse::BlobWithContent {
id, content, is_binary, size
} => Ok({
@@ -75,14 +73,14 @@ impl BackendFassade {
}
pub async fn find_tree_for_commit_id(&self, id: git2::Oid) -> Result<Tree> {
- match self.send_request(BackendRequest::FindTreeForCommitId(id)).await?.recv()? {
+ match self.send_request(BackendRequest::FindTreeForCommitId(id)).await? {
BackendResponse::TreeId(oid) => Ok(crate::model::tree::Tree::new(self.clone(), oid)),
_ => todo!()
}
}
pub async fn tree_elements(&self, id: git2::Oid) -> Result<Vec<crate::model::backend::messages::TreeElement>> {
- match self.send_request(BackendRequest::GetTreeElements(id)).await?.recv()? {
+ match self.send_request(BackendRequest::GetTreeElements(id)).await? {
BackendResponse::TreeElements(elements) => Ok(elements),
_ => todo!()
}
diff --git a/src/model/backend/worker.rs b/src/model/backend/worker.rs
index b2abd67..9103bda 100644
--- a/src/model/backend/worker.rs
+++ b/src/model/backend/worker.rs
@@ -1,4 +1,4 @@
-use std::sync::Mutex;
+use tokio::sync::Mutex;
use anyhow::anyhow;
use anyhow::Result;
@@ -26,59 +26,61 @@ impl BackendWorker {
BackendWorker { repo_name, repo, receiver }
}
- pub fn run(self) {
+ pub async fn run(mut self) {
loop {
- if let Err(e) = self.handle() {
+ if let Err(e) = self.handle().await {
log::error!("Error in backend: {}", e);
}
}
}
- fn handle(&self) -> std::result::Result<(), BackendError> {
+ async fn handle(&mut self) -> std::result::Result<(), BackendError> {
use result_inspect::*;
- let (request, sender) = self.receiver.recv().map_err(|_| BackendError::ChannelError)?;
+ let (request, sender) = self.receiver.recv().await.ok_or_else(|| BackendError::ChannelError)?;
log::trace!("Handling request: {:?}", request);
match request {
- BackendRequest::BranchList => self.send_branch_list(),
- BackendRequest::TagList => self.send_tag_list(),
- BackendRequest::FindBranchId(name) => self.find_branch_id(name),
- BackendRequest::FindTree(id) => self.find_tree_id(id),
- BackendRequest::FindBlob(id) => self.find_blob_id(id),
- BackendRequest::LoadBlob(id) => self.load_blob(id),
- BackendRequest::FindTreeForCommitId(id) => self.find_tree_for_commit_id(id),
- BackendRequest::GetTreeElements(id) => self.get_tree_elements(id),
+ BackendRequest::BranchList => self.send_branch_list().await,
+ BackendRequest::TagList => self.send_tag_list().await,
+ BackendRequest::FindBranchId(name) => self.find_branch_id(name).await,
+ BackendRequest::FindTree(id) => self.find_tree_id(id).await,
+ BackendRequest::FindBlob(id) => self.find_blob_id(id).await,
+ BackendRequest::LoadBlob(id) => self.load_blob(id).await,
+ BackendRequest::FindTreeForCommitId(id) => self.find_tree_for_commit_id(id).await,
+ BackendRequest::GetTreeElements(id) => self.get_tree_elements(id).await,
}
.inspect(|r| log::trace!("Response: {:?}", r))
.and_then(|r| sender.send(r).map_err(|_| BackendError::ChannelError))
}
- fn exec_on_repo<R, F>(&self, f: F) -> std::result::Result<R, BackendError>
- where R: Sized,
+ async fn exec_on_repo<R, F>(&self, f: F) -> std::result::Result<R, BackendError>
+ where R: Sized + Send,
F: FnOnce(&git2::Repository) -> std::result::Result<R, BackendError>,
{
- let repo = self.repo.lock().map_err(|_| BackendError::LockError)?;
+ let repo = self.repo.lock().await;
let res = f(&repo);
drop(repo);
res
}
- fn send_branch_list(&self) -> std::result::Result<BackendResponse, BackendError> {
+ async fn send_branch_list(&self) -> std::result::Result<BackendResponse, BackendError> {
self.exec_on_repo(|repo| {
get_branch_names(&self.repo_name, repo).map_err(|e| BackendError::Str(e.to_string()))
})
+ .await
.map(BackendResponse::BranchList)
}
- fn send_tag_list(&self) -> std::result::Result<BackendResponse, BackendError> {
+ async fn send_tag_list(&self) -> std::result::Result<BackendResponse, BackendError> {
self.exec_on_repo(|repo| {
get_tag_names(&self.repo_name, repo).map_err(|e| BackendError::Str(e.to_string()))
})
+ .await
.map(BackendResponse::TagList)
}
- fn find_branch_id(&self, name: String) -> std::result::Result<BackendResponse, BackendError> {
+ async fn find_branch_id(&self, name: String) -> std::result::Result<BackendResponse, BackendError> {
self.exec_on_repo(move |repo| {
let branch = repo.find_branch(&name, git2::BranchType::Local)
.map_err(|_| BackendError::FindBranchError(name.clone()))?;
@@ -88,28 +90,31 @@ impl BackendWorker {
.map_err(|_| BackendError::BranchPeelToCommitError(name))
.map(|c| c.id())
})
+ .await
.map(BackendResponse::BranchId)
}
- fn find_tree_id(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> {
+ async fn find_tree_id(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> {
self.exec_on_repo(|repo| {
repo.find_tree(id)
.map_err(|_| BackendError::FindTreeError(id))
.map(|tree| tree.id())
})
+ .await
.map(BackendResponse::TreeId)
}
- fn find_blob_id(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> {
+ async fn find_blob_id(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> {
self.exec_on_repo(|repo| {
repo.find_blob(id)
.map_err(|_| BackendError::FindBlobError(id))
.map(|blob| blob.id())
})
+ .await
.map(BackendResponse::BlobId)
}
- fn load_blob(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> {
+ async fn load_blob(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> {
self.exec_on_repo(|repo| {
repo.find_blob(id)
.map_err(|_| BackendError::FindBlobError(id))
@@ -120,21 +125,24 @@ impl BackendWorker {
size: blob.size(),
})
})
+ .await
}
- fn find_tree_for_commit_id(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> {
+ async fn find_tree_for_commit_id(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> {
self.exec_on_repo(|repo| {
repo.find_commit(id)
.map_err(|_| BackendError::FindCommitError(id))
.map(|commit| commit.tree_id())
})
+ .await
.map(BackendResponse::TreeId)
}
- fn get_tree_elements(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> {
+ async fn get_tree_elements(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> {
self.exec_on_repo(|repo| {
find_tree_elements(repo, id)
})
+ .await
.map(BackendResponse::TreeElements)
}
}
diff --git a/src/model/repo.rs b/src/model/repo.rs
index 195c3d4..edbc696 100644
--- a/src/model/repo.rs
+++ b/src/model/repo.rs
@@ -1,8 +1,8 @@
use std::path::Path;
-use std::sync::Mutex;
use anyhow::Result;
use anyhow::anyhow;
+use tokio::sync::Mutex;
use crate::model::backend::worker::BackendWorker;
use crate::model::backend::fassade::BackendFassade;
@@ -26,7 +26,7 @@ impl Repo {
log::info!("Opening {} = {}", name, path.as_ref().display());
let repo = git2::Repository::open(&path)?;
- let (sender, receiver) = crossbeam::channel::unbounded();
+ let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let worker = BackendWorker::new(name.clone(), Mutex::new(repo), receiver);
let backend = BackendFassade::new(sender);