diff options
author | Matthias Beyer <mail@beyermatthias.de> | 2020-11-14 17:56:11 +0100 |
---|---|---|
committer | Matthias Beyer <mail@beyermatthias.de> | 2020-11-14 18:18:27 +0100 |
commit | fcecf5fce755b2870cf0baae4a29104d146df948 (patch) | |
tree | 978997e6d0a5f1443befd28fdda35271e05f9c8f /src/endpoint/configured.rs | |
parent | 056a2c66e3d3fc8c4b6803acb9d151673b78a23f (diff) |
Rewrite to use tokio::sync::RwLock
This patch rewrites the codebase to not use std::sync::RwLock, but
tokio::sync::RwLock.
tokios RwLock is an async RwLock, which is what we want in an
async-await context. The more I use tokio, the more I understand what
you should do and what you shouldn't do.
Some parts of this patch are a rewrite, for example,
JobSet::into_runables()
was completely rewritten.
That was necessary because the function used inside is
`Runnable::build_from_job()`, which uses an RwLock internally, thus,
gets `async` in this patch.
Because of this, `JobSet::into_runables()` needed a complete rewrite as
well.
Because it is way more difficult than transforming the function to
return an iterator of futures, this patch simply rewrites it to return a
`Result<Vec<RunnableJob>>` instead.
Internally, tokio jobs are submitted via the
`futures::stream::FuturesUnordered<_>` now.
This is not the most performant implementation for the problem at hand,
but it is a reasonable simple one. Optimization could happen here, of
course.
Also, the implementation of resource preparation inside
`RunnableJob::build_from_job()` got a rewrite using the same technique.
Signed-off-by: Matthias Beyer <mail@beyermatthias.de>
Diffstat (limited to 'src/endpoint/configured.rs')
-rw-r--r-- | src/endpoint/configured.rs | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 23b3069..a9d4982 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -2,7 +2,6 @@ use std::fmt::{Debug, Formatter}; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; -use std::sync::RwLock; use anyhow::Context; use anyhow::Error; @@ -11,6 +10,7 @@ use anyhow::anyhow; use getset::{Getters, CopyGetters}; use shiplift::Docker; use shiplift::ExecContainerOptions; +use tokio::sync::RwLock; use tokio::sync::mpsc::UnboundedSender; use typed_builder::TypedBuilder; @@ -300,12 +300,14 @@ impl Endpoint { .copy_from(&PathBuf::from("/outputs/")) .map(|item| item.map_err(Error::from)); - let r = staging - .write() - .map_err(|_| anyhow!("Lock poisoned"))? - .write_files_from_tar_stream(tar_stream) - .await - .with_context(|| anyhow!("Copying the TAR stream to the staging store"))?; + let r = { + let mut writelock = staging.write().await; + + writelock + .write_files_from_tar_stream(tar_stream) + .await + .with_context(|| anyhow!("Copying the TAR stream to the staging store"))? + }; container.stop(Some(std::time::Duration::new(1, 0))).await?; |