Skip to content

Commit

Permalink
Use parallel ES indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
rsevilla87 committed May 28, 2020
1 parent f672aa8 commit 87c7bbc
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
4 changes: 3 additions & 1 deletion run_snafu.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def main():
if os.environ["es_port"] != "":
es['port'] = os.environ["es_port"]
logger.info("Using elasticsearch server with port:" + es['port'])
parallel = int(os.getenv('parallel_indexing', 0))
es_verify_cert = os.getenv("es_verify_cert", "true")
if len(es.keys()) == 2:
if os.environ["es_index"] != "":
Expand Down Expand Up @@ -96,7 +97,8 @@ def main():
res_beg, res_end, res_suc, res_dup, res_fail, res_retry = streaming_bulk(es,
process_generator(
index_args,
parser))
parser),
parallel)

logger.info(
"Indexed results - %s success, %s duplicates, %s failures, with %s retries." % (
Expand Down
15 changes: 9 additions & 6 deletions utils/py_es_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def put_template(es, name, body):
return beg, end, retry_count


def streaming_bulk(es, actions):
def streaming_bulk(es, actions, parallel=0):
"""
streaming_bulk(es, actions)
Arguments:
Expand Down Expand Up @@ -149,11 +149,14 @@ def actions_tracking_closure(cl_actions):
# Create the generator that closes over the external generator, "actions"
generator = actions_tracking_closure(actions)

streaming_bulk_generator = helpers.streaming_bulk(es, generator,
raise_on_error=False,
raise_on_exception=False,
request_timeout=_request_timeout)
for ok, resp_payload in streaming_bulk_generator:
indexer_f = helpers.parallel_bulk if parallel else helpers.streaming_bulk
indexer = indexer_f(es,
generator,
raise_on_error=False,
raise_on_exception=False,
request_timeout=_request_timeout)

for ok, resp_payload in indexer:
retry_count, action = actions_deque.popleft()
try:
resp = resp_payload[_op_type]
Expand Down

0 comments on commit 87c7bbc

Please sign in to comment.