summaryrefslogtreecommitdiffstats
path: root/src/endpoint/configured.rs
diff options
context:
space:
mode:
authorMatthias Beyer <mail@beyermatthias.de>2020-11-14 17:56:11 +0100
committerMatthias Beyer <mail@beyermatthias.de>2020-11-14 18:18:27 +0100
commitfcecf5fce755b2870cf0baae4a29104d146df948 (patch)
tree978997e6d0a5f1443befd28fdda35271e05f9c8f /src/endpoint/configured.rs
parent056a2c66e3d3fc8c4b6803acb9d151673b78a23f (diff)
Rewrite to use tokio::sync::RwLock
This patch rewrites the codebase to not use std::sync::RwLock, but tokio::sync::RwLock. tokios RwLock is an async RwLock, which is what we want in an async-await context. The more I use tokio, the more I understand what you should do and what you shouldn't do. Some parts of this patch are a rewrite, for example, JobSet::into_runables() was completely rewritten. That was necessary because the function used inside is `Runnable::build_from_job()`, which uses an RwLock internally, thus, gets `async` in this patch. Because of this, `JobSet::into_runables()` needed a complete rewrite as well. Because it is way more difficult than transforming the function to return an iterator of futures, this patch simply rewrites it to return a `Result<Vec<RunnableJob>>` instead. Internally, tokio jobs are submitted via the `futures::stream::FuturesUnordered<_>` now. This is not the most performant implementation for the problem at hand, but it is a reasonable simple one. Optimization could happen here, of course. Also, the implementation of resource preparation inside `RunnableJob::build_from_job()` got a rewrite using the same technique. Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/endpoint/configured.rs')
-rw-r--r--src/endpoint/configured.rs16
1 files changed, 9 insertions, 7 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs
index 23b3069..a9d4982 100644
--- a/src/endpoint/configured.rs
+++ b/src/endpoint/configured.rs
@@ -2,7 +2,6 @@ use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
-use std::sync::RwLock;
use anyhow::Context;
use anyhow::Error;
@@ -11,6 +10,7 @@ use anyhow::anyhow;
use getset::{Getters, CopyGetters};
use shiplift::Docker;
use shiplift::ExecContainerOptions;
+use tokio::sync::RwLock;
use tokio::sync::mpsc::UnboundedSender;
use typed_builder::TypedBuilder;
@@ -300,12 +300,14 @@ impl Endpoint {
.copy_from(&PathBuf::from("/outputs/"))
.map(|item| item.map_err(Error::from));
- let r = staging
- .write()
- .map_err(|_| anyhow!("Lock poisoned"))?
- .write_files_from_tar_stream(tar_stream)
- .await
- .with_context(|| anyhow!("Copying the TAR stream to the staging store"))?;
+ let r = {
+ let mut writelock = staging.write().await;
+
+ writelock
+ .write_files_from_tar_stream(tar_stream)
+ .await
+ .with_context(|| anyhow!("Copying the TAR stream to the staging store"))?
+ };
container.stop(Some(std::time::Duration::new(1, 0))).await?;