-
Notifications
You must be signed in to change notification settings - Fork 0
/
generate_expectation_suite.py
72 lines (59 loc) · 1.94 KB
/
generate_expectation_suite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import argparse
from pyspark_data_quality.libs.utils import Environment
from pyspark_data_quality.validate_module.expectation_suite_generator import ValidationSuiteGenerator
from pyspark_data_quality.validate_module.expectations.expectatione_rules import (
OrderedColumnsMatchExpectation,
ColumnsMatchExpectation,
RowCountBetweenExpectation,
ValuesNotNullExpectation
)
class SuiteGenerate:
"""
validation suite which built from different of expectations combination
"""
def __init__(self, env):
self._env = env
self.dataframe_cols = [
'col1',
'col2',
'col3',
'col4',
'dt'
]
self.dataframe_suite_name = "dataframe_validation_suite"
def run(self):
ordered_column_exp = OrderedColumnsMatchExpectation() \
.create(column_list=self.dataframe_cols)
column_exp = ColumnsMatchExpectation() \
.create(
column_set=self.dataframe_cols,
exact_match=False)
row_count_exp = RowCountBetweenExpectation() \
.create(min_rows=10000, max_rows=50000)
vsg = (
ValidationSuiteGenerator(
env=self._env,
expectation_suite_name=self.dataframe_suite_name)
.add_expectation(ordered_column_exp)
.add_expectation(column_exp)
.add_expectation(row_count_exp)
.build()
)
vsg.save_to_store()
def main() -> None:
parser = argparse.ArgumentParser()
parser.add_argument(
"--environment",
action="store",
type=Environment,
required=True,
choices=list(Environment),
help="Which environment?",
)
args = parser.parse_args()
## upload expectation json to s3
SuiteGenerate(
env=args.environment.value
).run()
if __name__ == "__main__":
main()