diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2021-08-02 19:57:14 +0200 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2021-08-02 19:57:14 +0200 |
commit | f826c22e35c18bf0874f2479785e529995949a5c (patch) | |
tree | edf60ee33cdcd56d056e8ef054f7f5e8faa55d58 | |
parent | 2272c317becaa631be900a8dbb312998189b2221 (diff) |
Rewrite backend implementation with more tokio asyncness
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/model/backend/channels.rs | 8 | ||||
-rw-r--r-- | src/model/backend/error.rs | 3 | ||||
-rw-r--r-- | src/model/backend/fassade.rs | 26 | ||||
-rw-r--r-- | src/model/backend/worker.rs | 56 | ||||
-rw-r--r-- | src/model/repo.rs | 4 |
6 files changed, 54 insertions, 44 deletions
@@ -24,4 +24,5 @@ resiter = "0.3" result-inspect = "0.1" serde = { version = "1", features = ["derive"] } syntect = "4.5" +tokio = { version = "1", features = ["rt-multi-thread", "fs", "sync"] } typed-builder = "0.9" diff --git a/src/model/backend/channels.rs b/src/model/backend/channels.rs index ce8c19f..4cb6c60 100644 --- a/src/model/backend/channels.rs +++ b/src/model/backend/channels.rs @@ -1,9 +1,9 @@ use crate::model::backend::messages::BackendRequest; use crate::model::backend::messages::BackendResponse; -pub type BackendRequestSender = crossbeam::channel::Sender<(BackendRequest, BackendResponseSender)>; -pub type BackendRequestdReceiver = crossbeam::channel::Receiver<(BackendRequest, BackendResponseSender)>; +pub type BackendRequestSender = tokio::sync::mpsc::UnboundedSender<(BackendRequest, BackendResponseSender)>; +pub type BackendRequestdReceiver = tokio::sync::mpsc::UnboundedReceiver<(BackendRequest, BackendResponseSender)>; -pub type BackendResponseSender = crossbeam::channel::Sender<BackendResponse>; -pub type BackendResponseReceiver = crossbeam::channel::Receiver<BackendResponse>; +pub type BackendResponseSender = tokio::sync::oneshot::Sender<BackendResponse>; +pub type BackendResponseReceiver = tokio::sync::oneshot::Receiver<BackendResponse>; diff --git a/src/model/backend/error.rs b/src/model/backend/error.rs index bfca03e..2126807 100644 --- a/src/model/backend/error.rs +++ b/src/model/backend/error.rs @@ -17,6 +17,9 @@ pub enum BackendError { #[display(fmt = "Backend Error: IPC failed")] ChannelError, + #[display(fmt = "Backend Error: task spawning failed")] + TaskError, + #[display(fmt = "Backend Error: Finding branch {} failed", _0)] #[error(ignore)] FindBranchError(String), diff --git a/src/model/backend/fassade.rs b/src/model/backend/fassade.rs index 9c8d4cf..fae9772 100644 --- a/src/model/backend/fassade.rs +++ b/src/model/backend/fassade.rs @@ -20,51 +20,49 @@ impl BackendFassade { BackendFassade { inner } } - async fn send_request(&self, req: BackendRequest) -> Result<BackendResponseReceiver> { - let (sender, receiver) = crossbeam::channel::bounded(1); - self.inner - .send(req.with_sender(sender)) - .map_err(Error::from) - .map(|_| receiver) + async fn send_request(&self, req: BackendRequest) -> Result<BackendResponse> { + let (sender, receiver) = tokio::sync::oneshot::channel(); + let _ = self.inner.send(req.with_sender(sender))?; + receiver.await.map_err(Error::from) } pub async fn get_branch_list(&self) -> Result<Vec<String>> { - match self.send_request(BackendRequest::BranchList).await?.recv()? { + match self.send_request(BackendRequest::BranchList).await? { BackendResponse::BranchList(list) => Ok(list), _ => todo!() } } pub async fn get_tag_list(&self) -> Result<Vec<String>> { - match self.send_request(BackendRequest::TagList).await?.recv()? { + match self.send_request(BackendRequest::TagList).await? { BackendResponse::TagList(list) => Ok(list), _ => todo!() } } pub async fn find_branch(&self, name: &str) -> Result<Branch> { - match self.send_request(BackendRequest::FindBranchId(name.to_string())).await?.recv()? { + match self.send_request(BackendRequest::FindBranchId(name.to_string())).await? { BackendResponse::BranchId(oid) => Ok(crate::model::branch::Branch::new(self.clone(), name.to_string(), oid)), _ => todo!() } } pub async fn find_tree(&self, id: git2::Oid) -> Result<Tree> { - match self.send_request(BackendRequest::FindTree(id)).await?.recv()? { + match self.send_request(BackendRequest::FindTree(id)).await? { BackendResponse::TreeId(oid) => Ok(crate::model::tree::Tree::new(self.clone(), oid)), _ => todo!() } } pub async fn find_blob(&self, id: git2::Oid) -> Result<Blob> { - match self.send_request(BackendRequest::FindBlob(id)).await?.recv()? { + match self.send_request(BackendRequest::FindBlob(id)).await? { BackendResponse::BlobId(oid) => Ok(crate::model::blob::Blob::new(self.clone(), oid)), _ => todo!() } } pub async fn load_blob(&self, id: git2::Oid) -> Result<BlobWithContent> { - match self.send_request(BackendRequest::LoadBlob(id)).await?.recv()? { + match self.send_request(BackendRequest::LoadBlob(id)).await? { BackendResponse::BlobWithContent { id, content, is_binary, size } => Ok({ @@ -75,14 +73,14 @@ impl BackendFassade { } pub async fn find_tree_for_commit_id(&self, id: git2::Oid) -> Result<Tree> { - match self.send_request(BackendRequest::FindTreeForCommitId(id)).await?.recv()? { + match self.send_request(BackendRequest::FindTreeForCommitId(id)).await? { BackendResponse::TreeId(oid) => Ok(crate::model::tree::Tree::new(self.clone(), oid)), _ => todo!() } } pub async fn tree_elements(&self, id: git2::Oid) -> Result<Vec<crate::model::backend::messages::TreeElement>> { - match self.send_request(BackendRequest::GetTreeElements(id)).await?.recv()? { + match self.send_request(BackendRequest::GetTreeElements(id)).await? { BackendResponse::TreeElements(elements) => Ok(elements), _ => todo!() } diff --git a/src/model/backend/worker.rs b/src/model/backend/worker.rs index b2abd67..9103bda 100644 --- a/src/model/backend/worker.rs +++ b/src/model/backend/worker.rs @@ -1,4 +1,4 @@ -use std::sync::Mutex; +use tokio::sync::Mutex; use anyhow::anyhow; use anyhow::Result; @@ -26,59 +26,61 @@ impl BackendWorker { BackendWorker { repo_name, repo, receiver } } - pub fn run(self) { + pub async fn run(mut self) { loop { - if let Err(e) = self.handle() { + if let Err(e) = self.handle().await { log::error!("Error in backend: {}", e); } } } - fn handle(&self) -> std::result::Result<(), BackendError> { + async fn handle(&mut self) -> std::result::Result<(), BackendError> { use result_inspect::*; - let (request, sender) = self.receiver.recv().map_err(|_| BackendError::ChannelError)?; + let (request, sender) = self.receiver.recv().await.ok_or_else(|| BackendError::ChannelError)?; log::trace!("Handling request: {:?}", request); match request { - BackendRequest::BranchList => self.send_branch_list(), - BackendRequest::TagList => self.send_tag_list(), - BackendRequest::FindBranchId(name) => self.find_branch_id(name), - BackendRequest::FindTree(id) => self.find_tree_id(id), - BackendRequest::FindBlob(id) => self.find_blob_id(id), - BackendRequest::LoadBlob(id) => self.load_blob(id), - BackendRequest::FindTreeForCommitId(id) => self.find_tree_for_commit_id(id), - BackendRequest::GetTreeElements(id) => self.get_tree_elements(id), + BackendRequest::BranchList => self.send_branch_list().await, + BackendRequest::TagList => self.send_tag_list().await, + BackendRequest::FindBranchId(name) => self.find_branch_id(name).await, + BackendRequest::FindTree(id) => self.find_tree_id(id).await, + BackendRequest::FindBlob(id) => self.find_blob_id(id).await, + BackendRequest::LoadBlob(id) => self.load_blob(id).await, + BackendRequest::FindTreeForCommitId(id) => self.find_tree_for_commit_id(id).await, + BackendRequest::GetTreeElements(id) => self.get_tree_elements(id).await, } .inspect(|r| log::trace!("Response: {:?}", r)) .and_then(|r| sender.send(r).map_err(|_| BackendError::ChannelError)) } - fn exec_on_repo<R, F>(&self, f: F) -> std::result::Result<R, BackendError> - where R: Sized, + async fn exec_on_repo<R, F>(&self, f: F) -> std::result::Result<R, BackendError> + where R: Sized + Send, F: FnOnce(&git2::Repository) -> std::result::Result<R, BackendError>, { - let repo = self.repo.lock().map_err(|_| BackendError::LockError)?; + let repo = self.repo.lock().await; let res = f(&repo); drop(repo); res } - fn send_branch_list(&self) -> std::result::Result<BackendResponse, BackendError> { + async fn send_branch_list(&self) -> std::result::Result<BackendResponse, BackendError> { self.exec_on_repo(|repo| { get_branch_names(&self.repo_name, repo).map_err(|e| BackendError::Str(e.to_string())) }) + .await .map(BackendResponse::BranchList) } - fn send_tag_list(&self) -> std::result::Result<BackendResponse, BackendError> { + async fn send_tag_list(&self) -> std::result::Result<BackendResponse, BackendError> { self.exec_on_repo(|repo| { get_tag_names(&self.repo_name, repo).map_err(|e| BackendError::Str(e.to_string())) }) + .await .map(BackendResponse::TagList) } - fn find_branch_id(&self, name: String) -> std::result::Result<BackendResponse, BackendError> { + async fn find_branch_id(&self, name: String) -> std::result::Result<BackendResponse, BackendError> { self.exec_on_repo(move |repo| { let branch = repo.find_branch(&name, git2::BranchType::Local) .map_err(|_| BackendError::FindBranchError(name.clone()))?; @@ -88,28 +90,31 @@ impl BackendWorker { .map_err(|_| BackendError::BranchPeelToCommitError(name)) .map(|c| c.id()) }) + .await .map(BackendResponse::BranchId) } - fn find_tree_id(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> { + async fn find_tree_id(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> { self.exec_on_repo(|repo| { repo.find_tree(id) .map_err(|_| BackendError::FindTreeError(id)) .map(|tree| tree.id()) }) + .await .map(BackendResponse::TreeId) } - fn find_blob_id(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> { + async fn find_blob_id(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> { self.exec_on_repo(|repo| { repo.find_blob(id) .map_err(|_| BackendError::FindBlobError(id)) .map(|blob| blob.id()) }) + .await .map(BackendResponse::BlobId) } - fn load_blob(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> { + async fn load_blob(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> { self.exec_on_repo(|repo| { repo.find_blob(id) .map_err(|_| BackendError::FindBlobError(id)) @@ -120,21 +125,24 @@ impl BackendWorker { size: blob.size(), }) }) + .await } - fn find_tree_for_commit_id(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> { + async fn find_tree_for_commit_id(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> { self.exec_on_repo(|repo| { repo.find_commit(id) .map_err(|_| BackendError::FindCommitError(id)) .map(|commit| commit.tree_id()) }) + .await .map(BackendResponse::TreeId) } - fn get_tree_elements(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> { + async fn get_tree_elements(&self, id: git2::Oid) -> std::result::Result<BackendResponse, BackendError> { self.exec_on_repo(|repo| { find_tree_elements(repo, id) }) + .await .map(BackendResponse::TreeElements) } } diff --git a/src/model/repo.rs b/src/model/repo.rs index 195c3d4..edbc696 100644 --- a/src/model/repo.rs +++ b/src/model/repo.rs @@ -1,8 +1,8 @@ use std::path::Path; -use std::sync::Mutex; use anyhow::Result; use anyhow::anyhow; +use tokio::sync::Mutex; use crate::model::backend::worker::BackendWorker; use crate::model::backend::fassade::BackendFassade; @@ -26,7 +26,7 @@ impl Repo { log::info!("Opening {} = {}", name, path.as_ref().display()); let repo = git2::Repository::open(&path)?; - let (sender, receiver) = crossbeam::channel::unbounded(); + let (sender, receiver) = tokio::sync::mpsc::unbounded_channel(); let worker = BackendWorker::new(name.clone(), Mutex::new(repo), receiver); let backend = BackendFassade::new(sender); |