forked from wizhaoredhat/ocp-traffic-flow-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
/
host.py
47 lines (38 loc) · 1.3 KB
/
host.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import subprocess
import os
import json
import shlex
import sys
from logger import logger
from common import Result
from abc import ABC, abstractmethod
class Host(ABC):
def ipa(self) -> dict:
return json.loads(self.run("ip -json a").out)
def ipr(self) -> dict:
return json.loads(self.run("ip -json r").out)
def all_ports(self) -> dict:
return json.loads(self.run("ip -json link").out)
@abstractmethod
def run(self, cmd: str, env: dict = os.environ.copy()) -> Result:
pass
class LocalHost(Host):
def __init__(self) -> None:
pass
def run(self, cmd: str, env: dict = os.environ.copy()) -> Result:
args = shlex.split(cmd)
pipe = subprocess.PIPE
with subprocess.Popen(args, stdout=pipe, stderr=pipe, env=env) as proc:
if proc.stdout is None:
logger.info("Can't find stdout")
sys.exit(-1)
if proc.stderr is None:
logger.info("Can't find stderr")
sys.exit(-1)
out = proc.stdout.read().decode("utf-8")
err = proc.stderr.read().decode("utf-8")
proc.communicate()
ret = proc.returncode
return Result(out, err, ret)
def file_exists(self, path: str) -> bool:
return os.path.exists(path)