Skip to content

Commit

Permalink
NiFi: cohort script update, fixed memory issue.
Browse files Browse the repository at this point in the history
  • Loading branch information
vladd-bit committed Jul 8, 2024
1 parent 8e96b94 commit 5460622
Showing 1 changed file with 25 additions and 23 deletions.
48 changes: 25 additions & 23 deletions nifi/user-scripts/cogstack_cohort_generate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def _process_patient_records(patient_records: list):
return _ptt2sex, _ptt2eth, _ptt2dob, _ptt2age, _ptt2dod, _doc2ptt


def _process_annotation_records(annotation_records: list, _doc2ptt: dict):
def _process_annotation_records(annotation_records: list):

_cui2ptt_pos = defaultdict(Counter)
_cui2ptt_tsp = defaultdict(lambda: defaultdict(int))
Expand All @@ -173,8 +173,8 @@ def _process_annotation_records(annotation_records: list, _doc2ptt: dict):
annotation_entity = annotation_record["_source"]
docid = annotation_entity[ANNOTATION_DOCUMENT_ID_FIELD_NAME]

if str(docid) in _doc2ptt.keys():
patient_id = _doc2ptt[str(docid)]
if str(docid) in unique_doc_ids.keys():
patient_id = global_doc2ptt[str(docid)]
cui = annotation_entity["nlp.cui"]

if annotation_entity["nlp.meta_anns"]["Subject"]["value"] == "Patient" and \
Expand Down Expand Up @@ -220,31 +220,31 @@ def multiprocess_patient_records(input_patient_record_data: dict):
counter = 0
for record_chunk in record_chunks:
rec_que.put(record_chunk)
patient_process_pool_results.append(patient_process_pool.starmap_async(_process_patient_records, [(rec_que.get(),)], chunksize=1, error_callback=logging.error))
patient_process_pool_results.append(patient_process_pool.starmap_async(_process_patient_records, [(rec_que.get(),)], error_callback=logging.error))
counter += 1

try:
for result in patient_process_pool_results:
result_data = result.get(timeout=TIMEOUT)
_ptt2sex, _ptt2eth, _ptt2dob, _ptt2age, _ptt2dod, _doc2ptt = result_data[0][0], result_data[0][1], result_data[0][2], result_data[0][3], result_data[0][4], result_data[0][5]

ptt2sex.update(_ptt2sex)
ptt2eth.update(_ptt2eth)
ptt2dob.update(_ptt2dob)
ptt2age.update(_ptt2age)
ptt2dod.update(_ptt2dod)
doc2ptt.update(_doc2ptt)

except Exception as exception:
time = datetime.now()
with open(log_file_path, "a+") as log_file:
log_file.write("\n" + str(time) + ": " + str(exception))
log_file.write("\n" + str(time) + ": " + traceback.format_exc())
try:
result_data = result.get(timeout=TIMEOUT)
_ptt2sex, _ptt2eth, _ptt2dob, _ptt2age, _ptt2dod, _doc2ptt = result_data[0][0], result_data[0][1], result_data[0][2], result_data[0][3], result_data[0][4], result_data[0][5]

ptt2sex.update(_ptt2sex)
ptt2eth.update(_ptt2eth)
ptt2dob.update(_ptt2dob)
ptt2age.update(_ptt2age)
ptt2dod.update(_ptt2dod)
doc2ptt.update(_doc2ptt)

except Exception as exception:
time = datetime.now()
with open(log_file_path, "a+") as log_file:
log_file.write("\n" + str(time) + ": " + str(exception))
log_file.write("\n" + str(time) + ": " + traceback.format_exc())

return doc2ptt, ptt2dod, ptt2age, ptt2dob, ptt2eth, ptt2sex


def multiprocess_annotation_records(doc2ptt: dict, input_annotations: dict):
def multiprocess_annotation_records(input_annotations: dict):

# cui2ptt_pos.jsonl each line is a dictionary of cui and the value is a dictionary of patients with a count {<cui>: {<patient_id>:<count>, ...}}\n...
cui2ptt_pos = defaultdict(Counter) # store the count of a SNOMED term for a patient
Expand All @@ -262,7 +262,7 @@ def multiprocess_annotation_records(doc2ptt: dict, input_annotations: dict):
counter = 0
for record_chunk in record_chunks:
rec_que.put(record_chunk)
annotation_process_pool_results.append(annotations_process_pool.starmap_async(_process_annotation_records, [(rec_que.get(), doc2ptt)], chunksize=1, error_callback=logging.error))
annotation_process_pool_results.append(annotations_process_pool.starmap_async(_process_annotation_records, [(rec_que.get(),)], error_callback=logging.error))
counter += 1

try:
Expand Down Expand Up @@ -338,6 +338,8 @@ def multiprocess_annotation_records(doc2ptt: dict, input_annotations: dict):
global_doc2ptt = f.read()
global_doc2ptt = json.loads(global_doc2ptt)

unique_doc_ids = set(global_doc2ptt.keys())

if INPUT_ANNOTATIONS_RECORDS_FILE_NAME_PATTERN:
# read each of the patient record files one by one
for root, sub_directories, files in os.walk(INPUT_FOLDER_PATH):
Expand All @@ -350,7 +352,7 @@ def multiprocess_annotation_records(doc2ptt: dict, input_annotations: dict):
with open(f_path, mode="r+") as f:
contents = json.loads(f.read())

cui2ptt_pos, cui2ptt_tsp = multiprocess_annotation_records(global_doc2ptt, contents)
cui2ptt_pos, cui2ptt_tsp = multiprocess_annotation_records(contents)
with open(os.path.join(OUTPUT_FOLDER_PATH, "cui2ptt_pos.jsonl"), "a+", encoding="utf-8") as outfile:
for k,v in cui2ptt_pos.items():
o = {k: v}
Expand Down

0 comments on commit 5460622

Please sign in to comment.