forked from wizhaoredhat/ocp-traffic-flow-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.py
144 lines (123 loc) · 5.37 KB
/
task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import sys
import yaml
import common
from abc import ABC, abstractmethod
from typing import Optional
from logger import logger
from testConfig import TestConfig
from testConfig import ClusterMode
from thread import ReturnValueThread
import host
from typing import Dict, Union, List
class Task(ABC):
def __init__(
self, tc: TestConfig, index: int, node_name: str, tenant: bool
) -> None:
self.template_args: Dict[str, str] = {}
self.in_file_template = ""
self.out_file_yaml = ""
self.pod_name = ""
self.exec_thread: ReturnValueThread
self.lh = host.LocalHost()
self.template_args["name_space"] = "default"
self.template_args["net_attach_def_name"] = "default"
self.template_args["test_image"] = common.FT_BASE_IMG
self.template_args["command"] = "/sbin/init"
self.template_args["args"] = ""
self.template_args["index"] = f"{index}"
self.index = index
self.node_name = node_name
self.tenant = tenant
if not self.tenant and tc.mode == ClusterMode.SINGLE:
logger.error("Cannot have non-tenant Task when cluster mode is single.")
sys.exit(-1)
self.template_args["node_name"] = self.node_name
self.tc = tc
def run_oc(self, cmd: str) -> common.Result:
if self.tenant:
r = self.tc.client_tenant.oc(cmd)
else:
r = self.tc.client_infra.oc(cmd)
return r
def get_pod_ip(self) -> str:
r = self.run_oc(f"get pod {self.pod_name} -o yaml")
if r.returncode != 0:
logger.info(r)
sys.exit(-1)
y = yaml.safe_load(r.out)
return y["status"]["podIP"]
def create_cluster_ip_service(self) -> str:
in_file_template = "./manifests/svc-cluster-ip.yaml.j2"
out_file_yaml = f"./manifests/yamls/svc-cluster-ip.yaml"
common.j2_render(in_file_template, out_file_yaml, self.template_args)
logger.info(f"Creating Cluster IP Service {out_file_yaml}")
r = self.run_oc(f"apply -f {out_file_yaml}")
if r.returncode != 0:
if "already exists" not in r.err:
logger.info(r)
sys.exit(-1)
return self.run_oc(
"get service tft-clusterip-service -o=jsonpath='{.spec.clusterIP}'"
).out
def create_node_port_service(self, nodeport: int) -> str:
in_file_template = "./manifests/svc-node-port.yaml.j2"
out_file_yaml = f"./manifests/yamls/svc-node-port.yaml"
self.template_args["nodeport_svc_port"] = f"{nodeport}"
common.j2_render(in_file_template, out_file_yaml, self.template_args)
logger.info(f"Creating Node Port Service {out_file_yaml}")
r = self.run_oc(f"apply -f {out_file_yaml}")
if r.returncode != 0:
if "already exists" not in r.err:
logger.info(r)
sys.exit(-1)
return self.run_oc(
"get service tft-nodeport-service -o=jsonpath='{.spec.clusterIP}'"
).out
def setup(self) -> None:
# Check if pod already exists
r = self.run_oc(f"get pod {self.pod_name} --output=json")
if r.returncode != 0:
# otherwise create the pod
logger.info(f"Creating Pod {self.pod_name}.")
r = self.run_oc(f"apply -f {self.out_file_yaml}")
if r.returncode != 0:
logger.info(r)
sys.exit(-1)
else:
logger.info(f"Pod {self.pod_name} already exists.")
logger.info(f"Waiting for Pod {self.pod_name} to become ready.")
r = self.run_oc(f"wait --for=condition=ready pod/{self.pod_name} --timeout=1m")
if r.returncode != 0:
logger.info(r)
sys.exit(-1)
@abstractmethod
def run(self, duration: int) -> None:
raise NotImplementedError(
"Must implement run(). Use SyncManager.wait_barrier()"
)
def stop(self, timeout: float) -> None:
class_name = self.__class__.__name__
logger.info(f"Stopping execution on {class_name}")
self.exec_thread.join_with_result(timeout=timeout * 1.5)
if self.exec_thread.result is not None:
r = self.exec_thread.result
if r.returncode != 0:
logger.error(
f"Error occurred while stopping {class_name}: errcode: {r.returncode} err {r.err}"
)
logger.debug(f"{class_name}.stop(): {r.out}")
self._output = self.generate_output(data=r.out)
else:
logger.error(f"Thread {class_name} did not return a result")
self._output = common.BaseOutput("", {})
"""
output() should be called to store the results of this task in a PluginOutput class object, and return this by appending the instance to the
TftAggregateOutput Plugin fields. Additionally, this function should handle printing any required info/debug to the console. The results must
be formated such that other modules can easily consume the output, such as a module to determine the success/failure/performance of a given run.
"""
@abstractmethod
def output(self, out: common.TftAggregateOutput) -> None:
raise NotImplementedError("Must implement output()")
@abstractmethod
def generate_output(self, data: str) -> common.BaseOutput:
raise NotImplementedError("Must implement generate_output()")