From 43e41b622071d6167d0180b49b61011a1cf14b23 Mon Sep 17 00:00:00 2001 From: Matthias Beyer Date: Tue, 8 Jun 2021 14:45:52 +0200 Subject: Insert-and-get should be executed as transaction This patch makes sure all database functions are executed as transactions, so that we do not accidentially end up in a data race if two butido instances are executing these functions at the same time. Signed-off-by: Matthias Beyer --- src/db/models/artifact.rs | 16 +++++++++------- src/db/models/endpoint.rs | 20 +++++++++++--------- src/db/models/envvar.rs | 18 ++++++++++-------- src/db/models/githash.rs | 18 ++++++++++-------- src/db/models/image.rs | 20 +++++++++++--------- src/db/models/job.rs | 18 ++++++++++-------- src/db/models/package.rs | 26 ++++++++++++++------------ src/db/models/release_store.rs | 26 +++++++++++++------------- src/db/models/releases.rs | 16 +++++++++------- src/db/models/submit.rs | 16 +++++++++------- 10 files changed, 106 insertions(+), 88 deletions(-) diff --git a/src/db/models/artifact.rs b/src/db/models/artifact.rs index e5adbaf..0bce8a7 100644 --- a/src/db/models/artifact.rs +++ b/src/db/models/artifact.rs @@ -80,13 +80,15 @@ impl Artifact { job_id: job.id, }; - diesel::insert_into(artifacts::table) - .values(&new_art) - .execute(database_connection)?; + database_connection.transaction::<_, Error, _>(|| { + diesel::insert_into(artifacts::table) + .values(&new_art) + .execute(database_connection)?; - dsl::artifacts - .filter(path.eq(path_str).and(job_id.eq(job.id))) - .first::(database_connection) - .map_err(Error::from) + dsl::artifacts + .filter(path.eq(path_str).and(job_id.eq(job.id))) + .first::(database_connection) + .map_err(Error::from) + }) } } diff --git a/src/db/models/endpoint.rs b/src/db/models/endpoint.rs index 3909b2b..176d4a2 100644 --- a/src/db/models/endpoint.rs +++ b/src/db/models/endpoint.rs @@ -33,15 +33,17 @@ impl Endpoint { pub fn create_or_fetch(database_connection: &PgConnection, ep_name: &EndpointName) -> Result { let new_ep = NewEndpoint { name: ep_name.as_ref() }; - diesel::insert_into(endpoints::table) - .values(&new_ep) - .on_conflict_do_nothing() - .execute(database_connection)?; - - dsl::endpoints - .filter(name.eq(ep_name.as_ref())) - .first::(database_connection) - .map_err(Error::from) + database_connection.transaction::<_, Error, _>(|| { + diesel::insert_into(endpoints::table) + .values(&new_ep) + .on_conflict_do_nothing() + .execute(database_connection)?; + + dsl::endpoints + .filter(name.eq(ep_name.as_ref())) + .first::(database_connection) + .map_err(Error::from) + }) } pub fn fetch_for_job(database_connection: &PgConnection, j: &crate::db::models::Job) -> Result> { diff --git a/src/db/models/envvar.rs b/src/db/models/envvar.rs index b5fcc4f..d3c7c64 100644 --- a/src/db/models/envvar.rs +++ b/src/db/models/envvar.rs @@ -43,14 +43,16 @@ impl EnvVar { value: v, }; - diesel::insert_into(envvars::table) - .values(&new_envvar) - .on_conflict_do_nothing() - .execute(database_connection)?; + database_connection.transaction::<_, Error, _>(|| { + diesel::insert_into(envvars::table) + .values(&new_envvar) + .on_conflict_do_nothing() + .execute(database_connection)?; - dsl::envvars - .filter(name.eq(k.as_ref()).and(value.eq(v))) - .first::(database_connection) - .map_err(Error::from) + dsl::envvars + .filter(name.eq(k.as_ref()).and(value.eq(v))) + .first::(database_connection) + .map_err(Error::from) + }) } } diff --git a/src/db/models/githash.rs b/src/db/models/githash.rs index dfb24b6..2dcb0bb 100644 --- a/src/db/models/githash.rs +++ b/src/db/models/githash.rs @@ -32,14 +32,16 @@ impl GitHash { pub fn create_or_fetch(database_connection: &PgConnection, githash: &str) -> Result { let new_hash = NewGitHash { hash: githash }; - diesel::insert_into(githashes::table) - .values(&new_hash) - .on_conflict_do_nothing() - .execute(database_connection)?; + database_connection.transaction::<_, Error, _>(|| { + diesel::insert_into(githashes::table) + .values(&new_hash) + .on_conflict_do_nothing() + .execute(database_connection)?; - dsl::githashes - .filter(hash.eq(githash)) - .first::(database_connection) - .map_err(Error::from) + dsl::githashes + .filter(hash.eq(githash)) + .first::(database_connection) + .map_err(Error::from) + }) } } diff --git a/src/db/models/image.rs b/src/db/models/image.rs index 600edd6..d7b6ae9 100644 --- a/src/db/models/image.rs +++ b/src/db/models/image.rs @@ -38,15 +38,17 @@ impl Image { name: image_name.as_ref(), }; - diesel::insert_into(images::table) - .values(&new_image) - .on_conflict_do_nothing() - .execute(database_connection)?; - - dsl::images - .filter(name.eq(image_name.as_ref())) - .first::(database_connection) - .map_err(Error::from) + database_connection.transaction::<_, Error, _>(|| { + diesel::insert_into(images::table) + .values(&new_image) + .on_conflict_do_nothing() + .execute(database_connection)?; + + dsl::images + .filter(name.eq(image_name.as_ref())) + .first::(database_connection) + .map_err(Error::from) + }) } pub fn fetch_for_job(database_connection: &PgConnection, j: &crate::db::models::Job) -> Result> { diff --git a/src/db/models/job.rs b/src/db/models/job.rs index de91a25..8f6097b 100644 --- a/src/db/models/job.rs +++ b/src/db/models/job.rs @@ -83,15 +83,17 @@ impl Job { log::trace!("Query = {}", diesel::debug_query::(&query)); - query - .execute(database_connection) - .context("Creating job in database")?; + database_connection.transaction::<_, Error, _>(|| { + query + .execute(database_connection) + .context("Creating job in database")?; - dsl::jobs - .filter(uuid.eq(job_uuid)) - .first::(database_connection) - .with_context(|| format!("Finding created job in database: {}", job_uuid)) - .map_err(Error::from) + dsl::jobs + .filter(uuid.eq(job_uuid)) + .first::(database_connection) + .with_context(|| format!("Finding created job in database: {}", job_uuid)) + .map_err(Error::from) + }) } pub fn env(&self, database_connection: &PgConnection) -> Result> { diff --git a/src/db/models/package.rs b/src/db/models/package.rs index 88839b6..1ae81b5 100644 --- a/src/db/models/package.rs +++ b/src/db/models/package.rs @@ -42,20 +42,22 @@ impl Package { version: p.version().deref(), }; - diesel::insert_into(packages::table) - .values(&new_package) - .on_conflict_do_nothing() - .execute(database_connection)?; + database_connection.transaction::<_, Error, _>(|| { + diesel::insert_into(packages::table) + .values(&new_package) + .on_conflict_do_nothing() + .execute(database_connection)?; - dsl::packages - .filter({ - let p_name = p.name().deref(); - let p_vers = p.version().deref(); + dsl::packages + .filter({ + let p_name = p.name().deref(); + let p_vers = p.version().deref(); - name.eq(p_name).and(version.eq(p_vers)) - }) - .first::(database_connection) - .map_err(Error::from) + name.eq(p_name).and(version.eq(p_vers)) + }) + .first::(database_connection) + .map_err(Error::from) + }) } pub fn fetch_for_job(database_connection: &PgConnection, j: &crate::db::models::Job) -> Result> { diff --git a/src/db/models/release_store.rs b/src/db/models/release_store.rs index bc3b097..ad6c9b2 100644 --- a/src/db/models/release_store.rs +++ b/src/db/models/release_store.rs @@ -10,6 +10,7 @@ use anyhow::Error; use anyhow::Result; +use diesel::Connection; use diesel::ExpressionMethods; use diesel::PgConnection; use diesel::QueryDsl; @@ -32,23 +33,22 @@ struct NewReleaseStore<'a> { } impl ReleaseStore { - pub fn create( - database_connection: &PgConnection, - name: &str, - ) -> Result { + pub fn create(database_connection: &PgConnection, name: &str) -> Result { let new_relstore = NewReleaseStore { store_name: name, }; - diesel::insert_into(schema::release_stores::table) - .values(&new_relstore) - .on_conflict_do_nothing() - .execute(database_connection)?; - - schema::release_stores::table - .filter(schema::release_stores::store_name.eq(name)) - .first::(database_connection) - .map_err(Error::from) + database_connection.transaction::<_, Error, _>(|| { + diesel::insert_into(schema::release_stores::table) + .values(&new_relstore) + .on_conflict_do_nothing() + .execute(database_connection)?; + + schema::release_stores::table + .filter(schema::release_stores::store_name.eq(name)) + .first::(database_connection) + .map_err(Error::from) + }) } } diff --git a/src/db/models/releases.rs b/src/db/models/releases.rs index 48e4f02..0aba8bb 100644 --- a/src/db/models/releases.rs +++ b/src/db/models/releases.rs @@ -50,13 +50,15 @@ impl Release { release_store_id: store.id, }; - diesel::insert_into(releases::table) - .values(&new_rel) - .execute(database_connection)?; + database_connection.transaction::<_, Error, _>(|| { + diesel::insert_into(releases::table) + .values(&new_rel) + .execute(database_connection)?; - dsl::releases - .filter(artifact_id.eq(art.id).and(release_date.eq(date))) - .first::(database_connection) - .map_err(Error::from) + dsl::releases + .filter(artifact_id.eq(art.id).and(release_date.eq(date))) + .first::(database_connection) + .map_err(Error::from) + }) } } diff --git a/src/db/models/submit.rs b/src/db/models/submit.rs index 154ed1b..61788ad 100644 --- a/src/db/models/submit.rs +++ b/src/db/models/submit.rs @@ -61,16 +61,18 @@ impl Submit { repo_hash_id: repo_hash.id, }; - diesel::insert_into(submits::table) - .values(&new_submit) + database_connection.transaction::<_, Error, _>(|| { + diesel::insert_into(submits::table) + .values(&new_submit) - // required because if we re-use the staging store, we do not create a new UUID but re-use the old one - .on_conflict_do_nothing() + // required because if we re-use the staging store, we do not create a new UUID but re-use the old one + .on_conflict_do_nothing() - .execute(database_connection) - .context("Inserting new submit into submits table")?; + .execute(database_connection) + .context("Inserting new submit into submits table")?; - Self::with_id(database_connection, submit_id) + Self::with_id(database_connection, submit_id) + }) } pub fn with_id(database_connection: &PgConnection, submit_id: &::uuid::Uuid) -> Result { -- cgit v1.2.3