-
Notifications
You must be signed in to change notification settings - Fork 1
/
ck_choices.py
executable file
·148 lines (131 loc) · 5.1 KB
/
ck_choices.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python
"""
makes modifications to the controlled vocabulary (implemented as
ckanext-scheming "choices")
HvW - 2016-06-07
"""
from pprint import pprint
import argparse
import sys
import json
import os
LOCAL_SCHEMA=("/usr/lib/ckan/default/src/ckanext-eaw_schema/ckanext/" +
"eaw_schema/eaw_schema_dataset.json")
def mkparser():
parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description=
"""
Make modifications to the controlled vocabulary FIELD
(implemented as ckanext-scheming "choices").
""", epilog=
"""
Examples:
ck_choices.py variables "new_var_1,New Variable One" newvar2,"Another One"
adds two new terms to the dataset_field "variables".
ck_choices.py variables --del new_var_1 newvar2
deletes them.
"""
)
parser.add_argument('field', nargs='?', help='the schema field to be modified',
metavar='FIELD')
parser.add_argument('--listfields', action='store_true', help='list fields'+\
' (the rest of the command line is ignored)')
parser.add_argument('--del', action='store_true', help='delete terms '+
'(default is adding terms)')
parser.add_argument('--resource', action='store_true', help='action ' +
'refers to resource field (default is dataset field)')
parser.add_argument('--schema', action='store', help='path to alternative'
+ ' schema definition', metavar='SCHEMA')
parser.add_argument('terms', nargs='*', help='the terms to be added '
+'(removed). Have the format "value,label" for adding,' +
' and "value" for removing', metavar='TERM')
return(parser)
def listfields(schema):
fields = schema['dataset_fields']
for f in ('{}, {}\n{}\n'.format(x['field_name'], x['label'],
sorted([c['value'] for c in x['choices']]))
for x in fields if 'choices' in x):
print f
def postparse(params, parser):
if params['listfields']:
return
terms = params['terms']
print("terms: {}".format(terms))
if not terms or not params['field']:
parser.print_help()
sys.exit(1)
terms = [tuple(x.split(',')) for x in terms]
print("terms: {}".format(terms))
if params['del'] and not all([len(x) == 1 for x in terms]):
parser.print_help()
sys.exit(1)
elif not params['del'] and not all([len(x) == 2 for x in terms]):
parser.print_help()
sys.exit(1)
terms = [x[0] if len(x) == 1 else x for x in terms]
return(terms)
def load_schema(schemafile):
try:
with open(schemafile) as sf:
schema = json.load(sf)
except ValueError:
raise(SystemExit("Schema file {} doesn't parse into JSON"
.format(schemafile), 1))
except IOError:
raise(SystemExit("Could not open schema file 'testschema'", 1))
return(schema)
def check_unique(field, choices, terms):
for t in [x[0] for x in terms]:
if t in [x['value'] for x in choices]:
raise SystemExit('{} already in {}'.format(t, field))
def update_field(schema, typ, field, remove, terms):
" typ: 'dataset_field' or 'resource_field'"
def _build_choices(terms):
ch = [{'value': t[0], 'label': t[1]} for t in terms]
return ch
def _get_val_index(val, choices):
idx = [x.get('value') for x in choices].index(val)
return(idx)
try:
f = [x for x in schema[typ] if x["field_name"] == field]
except KeyError:
raise SystemExit('Could not find field_type "{}"'.format(typ))
if not f:
raise SystemExit("Could not find field \"{}\" in \"{}\""
.format(field, typ))
assert(len(f) == 1)
c = f[0]['choices']
if not remove:
check_unique(field, c, terms)
c.extend(_build_choices(terms))
else:
try:
rmidx = [_get_val_index(val, c) for val in terms]
except ValueError:
raise SystemExit('Not all terms found in ' +
'field "{}" in "{}"'.format(field, typ))
if len(rmidx) < len(terms):
raise SystemExit('Not all terms found in ' +
'field "{}" in "{}"'.format(field, typ))
cnew = [x[1] for x in enumerate(c) if x[0] not in rmidx]
f[0]['choices'] = cnew
return(schema)
def write_schema(newschema, path):
with open(path, 'w') as f:
json.dump(newschema, f, indent=2)
def main():
parser = mkparser()
params = vars(parser.parse_args())
terms = postparse(params, parser)
schemasource = params.get('schema') or LOCAL_SCHEMA
schema = load_schema(schemasource)
if params['listfields']:
listfields(schema)
sys.exit()
field = params['field']
remove = params['del']
typ = 'resource_fields' if params['resource'] else 'dataset_fields'
newschema = update_field(schema, typ, field, remove, terms)
write_schema(newschema, schemasource)
if __name__ == '__main__':
main()