//
// Copyright (c) 2020-2021 science+computing ag and other contributors
//
// This program and the accompanying materials are made
// available under the terms of the Eclipse Public License 2.0
// which is available at https://www.eclipse.org/legal/epl-2.0/
//
// SPDX-License-Identifier: EPL-2.0
//
use std::fmt::{Debug, Formatter};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use anyhow::Context;
use anyhow::Error;
use anyhow::Result;
use anyhow::anyhow;
use futures::FutureExt;
use getset::{CopyGetters, Getters};
use log::trace;
use shiplift::Container;
use shiplift::Docker;
use shiplift::ExecContainerOptions;
use tokio::sync::RwLock;
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::StreamExt;
use typed_builder::TypedBuilder;
use crate::config::EndpointName;
use crate::endpoint::EndpointConfiguration;
use crate::filestore::ReleaseStore;
use crate::filestore::StagingStore;
use crate::filestore::path::ArtifactPath;
use crate::job::JobResource;
use crate::job::RunnableJob;
use crate::log::LogItem;
use crate::log::buffer_stream_to_line_stream;
use crate::package::Script;
use crate::util::docker::ContainerHash;
use crate::util::docker::ImageName;
#[derive(Getters, CopyGetters, TypedBuilder)]
pub struct Endpoint {
#[getset(get = "pub")]
name: EndpointName,
#[getset(get = "pub")]
docker: Docker,
#[getset(get_copy = "pub")]
num_max_jobs: usize,
#[getset(get = "pub")]
network_mode: Option<String>,
#[getset(get = "pub")]
uri: String,
#[builder(default)]
running_jobs: std::sync::atomic::AtomicUsize,
}
impl Debug for Endpoint {
fn fmt(&self, f: &mut Formatter) -> std::result::Result<(), std::fmt::Error> {
write!(f, "Endpoint({}, max: {})", self.name, self.num_max_jobs)
}
}
impl Endpoint {
pub(super) async fn setup(epc: EndpointConfiguration) -> Result<Self> {
let ep = Endpoint::setup_endpoint(epc.endpoint_name(), epc.endpoint()).with_context(|| {
anyhow!(
"Setting up endpoint: {} -> {}",
epc.endpoint_name(),
epc.endpoint().uri()
)
})?;
let versions_compat =
Endpoint::check_version_compat(epc.required_docker_versions().as_ref(), &ep);
let api_versions_compat =
Endpoint::check_api_version_compat(epc.required_docker_api_versions().as_ref(), &ep);
let imgs_avail = Endpoint::check_images_available(epc.required_images().as_ref(), &ep);
let (versions_compat, api_versions_compat, imgs_avail) = {
let timeout = std::time::Duration::from_secs(epc.endpoint().timeout().unwrap_or(10));
let versions_compat = tokio::time::timeout(timeout, versions_compat);
let api_versions_compat = tokio::time::timeout(timeout, api_versions_compat);
let imgs_avail = tokio::time::timeout(timeout, imgs_avail);
tokio::join!(versions_compat, api_versions_compat, imgs_avail)
};
let _ = versions_compat.with_context(|| {
anyhow!(
"Checking version compatibility for {} -> {}",
epc.endpoint_name(),
epc.endpoint().uri()
)
})?;
let _ = api_versions_compat.with_context(|| {
anyhow!(
"Checking API version compatibility for {} -> {}",
epc.endpoint_name(),
epc.endpoint().uri()
)
})?;
let _ = imgs_avail.with_context(|| {
anyhow!(
"Checking for available images on {} -> {}",
epc.endpoint_name(),
epc.endpoint().uri()
)
})?;
Ok(ep)
}
fn setup_endpoint(ep_name: &EndpointName, ep: &crate::config::Endpoint) -> Result<Endpoint> {
match ep.endpoint_type() {
crate::config::EndpointType::Http => shiplift::Uri::from_str(ep.uri())
.map(shiplift::Docker::host)
.with_context(|| anyhow!("Connecting to {}", ep.uri()))
.map_err(Error::from)
.map(|docker| {
Endpoint::builder()
.name(ep_name.clone())
.uri(ep.uri().clone())
.docker(docker)
.num_max_jobs(ep.maxjobs())
.network_mode(ep.network_mode().clone())
.build()
}),
crate::config::EndpointType::Socket => Ok({
Endpoint::builder()
.name(ep_name.clone())
.uri(ep.uri().clone())
.num_max_jobs(ep.maxjobs())
.network_mode(ep.network_mode().clone())
.docker(shiplift::Docker::unix(ep.uri()))
.build()
}),
}
}
async fn check_version_compat(req: Option<&Vec<String>>, ep: &Endpoint) -> Result<()> {
match req {
None => Ok(()),
Some(v) =>