Skip to content

Commit

Permalink
Configmap (#47)
Browse files Browse the repository at this point in the history
* Use configmap for JSON_INPUT

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Fix basic tests

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

* Fix integration tests

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>

Signed-off-by: Lukas Hejtmanek <xhejtman@gmail.com>
  • Loading branch information
xhejtman authored Jan 24, 2023
1 parent a1f1c00 commit 560fd03
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 62 deletions.
20 changes: 20 additions & 0 deletions src/integration-test/java/uk/ac/ebi/tsc/tesk/AuthIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public void admin_createTask() throws Exception {
.withRequestBody(matchingJsonPath("$.metadata.labels[?(@.creator-user-id == '123')]"))
.willReturn(okJson("{\"metadata\":{\"name\":\"task-fe99716a\"}}")));

mockKubernetes.givenThat(
WireMock.post("/api/v1/namespaces/default/configmaps")
.willReturn(okJson("{\"metadata\":{\"name\":\"task-fe99716a\"}}")));

String path = "fromTesToK8s_minimal/task.json";
this.mvc.perform(post(TASK_URL)
.content(getFileContentFromResources(path))
Expand All @@ -97,6 +101,10 @@ public void adminAndMember_createTask() throws Exception {
.withRequestBody(matchingJsonPath("$.metadata.labels[?(@.creator-user-id == '123')]"))
.willReturn(okJson("{\"metadata\":{\"name\":\"task-fe99716a\"}}")));

mockKubernetes.givenThat(
WireMock.post("/api/v1/namespaces/default/configmaps")
.willReturn(okJson("{\"metadata\":{\"name\":\"task-fe99716a\"}}")));

String path = "fromTesToK8s_minimal/task.json";
this.mvc.perform(post(TASK_URL)
.content(getFileContentFromResources(path))
Expand Down Expand Up @@ -133,6 +141,10 @@ public void authorizedUser_createTask() throws Exception {
.withRequestBody(matchingJsonPath("$.metadata.labels[?(@.creator-user-id == '123')]"))
.willReturn(okJson("{\"metadata\":{\"name\":\"task-fe99716a\"}}")));

mockKubernetes.givenThat(
WireMock.post("/api/v1/namespaces/default/configmaps")
.willReturn(okJson("{\"metadata\":{\"name\":\"task-fe99716a\"}}")));

String path = "fromTesToK8s_minimal/task.json";
this.mvc.perform(post(TASK_URL)
.content(getFileContentFromResources(path))
Expand All @@ -153,6 +165,10 @@ public void multiGroups_createTask() throws Exception {
.withRequestBody(matchingJsonPath("$.metadata.labels[?(@.creator-user-id == '123')]"))
.willReturn(okJson("{\"metadata\":{\"name\":\"task-fe99716a\"}}")));

mockKubernetes.givenThat(
WireMock.post("/api/v1/namespaces/default/configmaps")
.willReturn(okJson("{\"metadata\":{\"name\":\"task-fe99716a\"}}")));

String path = "fromTesToK8s_minimal/task.json";
this.mvc.perform(post(TASK_URL)
.content(getFileContentFromResources(path))
Expand All @@ -174,6 +190,10 @@ public void chosenGroup_createTask() throws Exception {
.withRequestBody(matchingJsonPath("$.metadata.labels[?(@.creator-group-name == 'ABC')]"))
.willReturn(okJson("{\"metadata\":{\"name\":\"task-fe99716a\"}}")));

mockKubernetes.givenThat(
WireMock.post("/api/v1/namespaces/default/configmaps")
.willReturn(okJson("{\"metadata\":{\"name\":\"task-fe99716a\"}}")));

mockKubernetes.givenThat(
WireMock.get("/api/v1/namespaces/default/limitranges")
.willReturn(okJson("{\"metadata\":{\"name\":\"core-resource-limits\"}}")));
Expand Down
4 changes: 4 additions & 0 deletions src/integration-test/java/uk/ac/ebi/tsc/tesk/NoAuthIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public void createTask() throws Exception {
.withRequestBody(matchingJsonPath("$.metadata.labels[?(@.creator-user-id == 'anonymousUser')]"))
.willReturn(okJson("{\"metadata\":{\"name\":\"task-fe99716a\"}}")));

mockKubernetes.givenThat(
WireMock.post("/api/v1/namespaces/default/configmaps")
.willReturn(okJson("{\"metadata\":{\"name\":\"task-fe99716a\"}}")));

String path = "fromTesToK8s_minimal/task.json";
this.mvc.perform(post(TASK_URL)
.content(getFileContentFromResources(path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import java.io.ByteArrayOutputStream;
import java.util.zip.GZIPOutputStream;

import static uk.ac.ebi.tsc.tesk.k8s.constant.Constants.*;
import static uk.ac.ebi.tsc.tesk.k8s.constant.K8sConstants.*;

Expand Down Expand Up @@ -89,31 +92,67 @@ public V1Job fromTesTaskToK8sJob(TesTask task, User user) {
} catch (JsonProcessingException ex) {
logger.info(String.format("Serializing task %s to JSON failed", taskMasterJob.getMetadata().getName()), ex);
}
//Converting executors to Kubernetes Job Objects

V1Volume volume = new V1Volume();
volume.setName("jsoninput");
V1ConfigMapVolumeSource src = new V1ConfigMapVolumeSource();
src.setName(taskMasterJob.getMetadata().getName());
volume.setConfigMap(src);
taskMasterJob.getSpec().getTemplate().getSpec().addVolumesItem(volume);

return taskMasterJob;
}


/**
* Converts TES task to new K8s ConfigMap object with random generated name
*
* @param task - TES Task input object
* @return K8s Job Object
*/
@SuppressWarnings("unchecked")
public V1ConfigMap fromTesTaskToK8sConfigMap(TesTask task, User user, V1Job job) {
//get new Job template with random generated name;
V1ConfigMap taskMasterConfigMap = new V1ConfigMap();
taskMasterConfigMap.setMetadata(new V1ObjectMeta());
taskMasterConfigMap.getMetadata().setName(job.getMetadata().getName());
//put input task name as annotation
taskMasterConfigMap.getMetadata().putAnnotationsItem(ANN_TESTASK_NAME_KEY, task.getName());
//creating user and owning group
taskMasterConfigMap.getMetadata().putLabelsItem(LABEL_USERID_KEY, user.getUsername());
if (task.getTags() != null && task.getTags().containsKey("GROUP_NAME")) {
taskMasterConfigMap.getMetadata().putLabelsItem(LABEL_GROUPNAME_KEY, task.getTags().get("GROUP_NAME"));
} else if (user.isMember()) {
taskMasterConfigMap.getMetadata().putLabelsItem(LABEL_GROUPNAME_KEY, user.getAnyGroup());
}
List<V1Job> executorsAsJobs = IntStream.range(0, task.getExecutors().size()).
mapToObj(i -> this.fromTesExecutorToK8sJob(taskMasterJob.getMetadata().getName(), task.getName(), task.getExecutors().get(i), i, task.getResources(), user)).
mapToObj(i -> this.fromTesExecutorToK8sJob(taskMasterConfigMap.getMetadata().getName(), task.getName(), task.getExecutors().get(i), i, task.getResources(), user)).
collect(Collectors.toList());
Map<String, Object> taskMasterInput = new HashMap<>();
try {
//converting original inputs, outputs, volumes and disk size back again to JSON (will be part of taskMaster's input parameter)
//Jackson - for TES objects
List<TesInput> inputs = task.getInputs() == null ? new ArrayList<>() : task.getInputs();
List<TesOutput> outputs = task.getOutputs() == null ? new ArrayList<>() : task.getOutputs();
List<String> volumes = task.getVolumes() == null ? new ArrayList<>() : task.getVolumes();
String jobAsJson = this.objectMapper.writeValueAsString(new TesTask().inputs(inputs).outputs(outputs).volumes(volumes).
resources(new TesResources().diskGb(Optional.ofNullable(task.getResources()).map(TesResources::getDiskGb).orElse(RESOURCE_DISK_DEFAULT))));
//merging 2 JSONs together into one map
Map<String, Object> jobAsMap = gson.fromJson(jobAsJson, Map.class);
taskMasterInput.putAll(jobAsMap);
} catch (JsonProcessingException e) {
logger.info(String.format("Serializing copy of task %s to JSON failed", taskMasterJob.getMetadata().getName()), e);
//TODO throw
logger.info(String.format("Serializing copy of task %s to JSON failed", taskMasterConfigMap.getMetadata().getName()), e);
}
taskMasterInput.put(TASKMASTER_INPUT_EXEC_KEY, executorsAsJobs);
String taskMasterInputAsJSON = this.gson.toJson(taskMasterInput);
//placing taskmaster's parameter (JSONed map of: inputs, outputs, volumes, executors (as jobs) into ENV variable in taskmaster spec
taskMasterJob.getSpec().getTemplate().getSpec().getContainers().get(0).getEnv().stream().filter(x -> x.getName().equals(TASKMASTER_INPUT)).forEach(x -> x.setValue(taskMasterInputAsJSON));
return taskMasterJob;

try {
ByteArrayOutputStream obj=new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(obj);
gzip.write(taskMasterInputAsJSON.getBytes("UTF-8"));
gzip.close();
taskMasterConfigMap.putBinaryDataItem(TASKMASTER_INPUT+".gz", obj.toByteArray());
} catch (Exception e) {
logger.info(String.format("Compression of task %s JSON configmap failed", taskMasterConfigMap.getMetadata().getName()), e);
}
return taskMasterConfigMap;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.kubernetes.client.openapi.apis.CoreV1Api;

import io.kubernetes.client.openapi.models.V1Job;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1JobList;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.openapi.models.V1LimitRangeList;
Expand Down Expand Up @@ -68,6 +69,14 @@ public V1Job createJob(V1Job job) {
}
}

public V1ConfigMap createConfigMap(V1ConfigMap map) {
try {
return this.coreApi.createNamespacedConfigMap(namespace, map, null, null, null);
} catch (ApiException e) {
throw KubernetesException.fromApiException(e);
}
}

public V1Job readTaskmasterJob(String taskId) {
try {
V1Job job = this.batchApi.readNamespacedJob(taskId, namespace, null, null, null);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package uk.ac.ebi.tsc.tesk.tes.service;

import io.kubernetes.client.openapi.models.V1Job;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1JobList;
import io.kubernetes.client.openapi.models.V1PodList;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -65,6 +66,8 @@ public TesCreateTaskResponse createTask(TesTask task, User user) {
}

V1Job taskMasterJob = this.converter.fromTesTaskToK8sJob(task, user);
V1ConfigMap taskMasterConfigMap = this.converter.fromTesTaskToK8sConfigMap(task, user, taskMasterJob);
V1ConfigMap createdConfigMap = this.kubernetesClientWrapper.createConfigMap(taskMasterConfigMap);
V1Job createdJob = this.kubernetesClientWrapper.createJob(taskMasterJob);
return this.converter.fromK8sJobToTesCreateTaskResponse(createdJob);
} catch (KubernetesException e) {
Expand Down
14 changes: 8 additions & 6 deletions src/main/resources/taskmaster.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,10 @@
"name": "taskmaster",
"image": "eu.gcr.io/tes-wes/taskmaster:v0.0.5.2",
"args": [
"$(JSON_INPUT)"
"-f",
"/jsoninput/JSON_INPUT.gz"
],
"env": [
{
"name": "JSON_INPUT",
"value": ""
},
{
"name": "TESK_FTP_USERNAME",
"valueFrom": {
Expand All @@ -49,7 +46,12 @@
"name": "podinfo",
"mountPath": "/podinfo",
"readOnly": true
}
},
{
"name": "jsoninput",
"mountPath": "/jsoninput",
"readOnly": true
}
]
}
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.kubernetes.client.openapi.models.V1JobStatus;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.openapi.models.V1ConfigMap;
import org.hamcrest.CoreMatchers;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -38,6 +39,7 @@

import java.io.*;
import java.util.function.Supplier;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.Matchers.is;
Expand All @@ -52,6 +54,8 @@
import static uk.ac.ebi.tsc.tesk.k8s.constant.Constants.LABEL_TASKSTATE_KEY;
import static uk.ac.ebi.tsc.tesk.k8s.constant.Constants.LABEL_TASKSTATE_VALUE_CANC;

import java.io.ByteArrayInputStream;
import java.util.zip.GZIPInputStream;

/**
* @author Ania Niewielska <aniewielska@ebi.ac.uk>
Expand Down Expand Up @@ -125,10 +129,43 @@ public void fromTesTaskToK8sJob_minimal() throws IOException {
assertThat(outputJob.getMetadata().getLabels().get("creator-user-id"), is("test-user-id"));
assertThat(outputJob.getMetadata().getLabels().get("creator-group-name"), is("ABC"));

assertThat(outputJob.getSpec().getTemplate().getSpec().getContainers().get(0).getArgs().get(0), is("$(JSON_INPUT)"));
Resource outputJobFile = new ClassPathResource("fromTesToK8s_minimal/job.json");
V1Job expectedJob;
try (InputStream inputStream = outputJobFile.getInputStream();
Reader reader = new BufferedReader(new InputStreamReader(inputStream))) {
expectedJob = this.gson.fromJson(reader, V1Job.class);
}
expectedJob.getMetadata().setAnnotations(null);
outputJob.getMetadata().setAnnotations(null);
expectedJob.getSpec().getTemplate().getSpec().getContainers().get(0).getEnv().stream().filter(env -> env.getName().equals(Constants.TASKMASTER_INPUT)).forEach(env -> env.setValue(""));
outputJob.getSpec().getTemplate().getSpec().getContainers().get(0).getEnv().stream().filter(env -> env.getName().equals(Constants.TASKMASTER_INPUT)).forEach(env -> env.setValue(""));
//comparing fields of resulting Job object and pattern Job objects other those with JSON values, which were cleared in previous lines (JSON strings do not have to be exactly equal to pattern).
assertEquals(expectedJob, outputJob);
}

@Test
public void fromTesTaskToK8sConfigMap() throws IOException {

given(this.jobNameGenerator.getTaskMasterName()).willReturn("task-98605447");
Resource inputTaskFile = new ClassPathResource("fromTesToK8s_minimal/task.json");
TesTask inputTask;
try (InputStream inputStream = inputTaskFile.getInputStream();
Reader reader = new BufferedReader(new InputStreamReader(inputStream))) {
inputTask = this.objectMapper.readValue(reader, TesTask.class);
}
V1Job outputJob = this.converter.fromTesTaskToK8sJob(inputTask, User.builder("test-user-id").teskMemberedGroups(StringUtils.commaDelimitedListToSet("ABC,CDE")).build());

V1ConfigMap outputConfigMap = this.converter.fromTesTaskToK8sConfigMap(inputTask, User.builder("test-user-id").teskMemberedGroups(StringUtils.commaDelimitedListToSet("ABC,CDE")).build(), outputJob);
Map<String, byte[]> binaryDataMap = outputConfigMap.getBinaryData();
GZIPInputStream gis = new GZIPInputStream(new ByteArrayInputStream(binaryDataMap.get("JSON_INPUT.gz")));
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gis, "UTF-8"));
String line;
String jsonString="";
while ((line = bufferedReader.readLine()) != null) {
jsonString = jsonString + line;
}

JsonContentAssert taskMasterInputJson = new JsonContentAssert(this.getClass(), outputJob.getSpec().getTemplate().getSpec().
getContainers().get(0).getEnv().stream().filter(env -> env.getName().equals("JSON_INPUT")).findAny().get().getValue());
JsonContentAssert taskMasterInputJson = new JsonContentAssert(this.getClass(), jsonString);
taskMasterInputJson.hasJsonPathValue("outputs");
taskMasterInputJson.extractingJsonPathNumberValue("outputs.size()").isEqualTo(0);
taskMasterInputJson.extractingJsonPathNumberValue("inputs.size()").isEqualTo(0);
Expand All @@ -149,18 +186,6 @@ public void fromTesTaskToK8sJob_minimal() throws IOException {

taskMasterInputJson.isEqualToJson(new ClassPathResource("fromTesToK8s_minimal/taskmaster_param.json"), JSONCompareMode.NON_EXTENSIBLE);

Resource outputJobFile = new ClassPathResource("fromTesToK8s_minimal/job.json");
V1Job expectedJob;
try (InputStream inputStream = outputJobFile.getInputStream();
Reader reader = new BufferedReader(new InputStreamReader(inputStream))) {
expectedJob = this.gson.fromJson(reader, V1Job.class);
}
expectedJob.getMetadata().setAnnotations(null);
outputJob.getMetadata().setAnnotations(null);
expectedJob.getSpec().getTemplate().getSpec().getContainers().get(0).getEnv().stream().filter(env -> env.getName().equals(Constants.TASKMASTER_INPUT)).forEach(env -> env.setValue(""));
outputJob.getSpec().getTemplate().getSpec().getContainers().get(0).getEnv().stream().filter(env -> env.getName().equals(Constants.TASKMASTER_INPUT)).forEach(env -> env.setValue(""));
//comparing fields of resulting Job object and pattern Job objects other those with JSON values, which were cleared in previous lines (JSON strings do not have to be exactly equal to pattern).
assertEquals(expectedJob, outputJob);
}

private TaskBuilder prepareBaseTaskBuider(boolean withExecutors, boolean withPods) throws IOException {
Expand Down
Loading

0 comments on commit 560fd03

Please sign in to comment.