Skip to content

Commit

Permalink
NiFi scrpts: cohort export possible duplicate patient id records fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
vladd-bit committed Jul 25, 2024
1 parent 3687116 commit 3d0b2e6
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions nifi/user-scripts/cogstack_cohort_generate_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
import os
import traceback
import multiprocess
import random

from multiprocess import Pool, Queue
from collections import defaultdict, Counter
from utils.ethnicity_map import ethnicity_map
from utils.generic import chunk, dict2json_truncate_add_to_file
from utils.generic import chunk, dict2json_truncate_add_to_file, dict2json_file

# default values from /deploy/nifi.env
USER_SCRIPT_LOGS_DIR = os.getenv("USER_SCRIPT_LOGS_DIR", "")
Expand Down Expand Up @@ -48,9 +46,7 @@
INPUT_PATIENT_RECORD_FILE_NAME_PATTERN = ""
INPUT_ANNOTATIONS_RECORDS_FILE_NAME_PATTERN = ""

# if this is enabled, the maximum patient age will be somewhere between 90 and 99 IF there is no DATE OF DEATH available
# calculated as: current_year - patient_year_of_birth, if > 100, patient_age = rand(90,99)
ENABLE_PATIENT_AGE_LIMIT = "false"
EXPORT_ONLY_PATIENT_RECORDS = "false"

for arg in sys.argv:
_arg = arg.split("=", 1)
Expand Down Expand Up @@ -84,8 +80,7 @@
INPUT_PATIENT_RECORD_FILE_NAME_PATTERN = _arg[1]
if _arg[0] == "input_annotations_records_file_name_pattern":
INPUT_ANNOTATIONS_RECORDS_FILE_NAME_PATTERN = _arg[1]
if _arg[0] == "enable_patient_age_limit":
ENABLE_PATIENT_AGE_LIMIT = str(_arg[1]).lower()


def _process_patient_records(patient_records: list):
_ptt2sex, _ptt2eth, _ptt2dob, _ptt2age, _ptt2dod, _doc2ptt = {}, {}, {}, {}, {}, {}
Expand Down Expand Up @@ -138,9 +133,6 @@ def _process_patient_records(patient_records: list):
patient_age = abs(dod.year - dob.year)
else:
patient_age = abs(datetime.now().year - dob.year)
if patient_age > 100 and ENABLE_PATIENT_AGE_LIMIT == "true":
dod = datetime.now(tz=timezone.utc)
patient_age = -1

# convert to ints
dod = int(dod.strftime("%Y%m%d%H%M%S")) if dod not in [0, None, "null"] else 0
Expand Down Expand Up @@ -318,6 +310,8 @@ def multiprocess_annotation_records(input_annotations: dict):
global_doc2ptt = {}

if INPUT_PATIENT_RECORD_FILE_NAME_PATTERN:
ptt2dod, ptt2age, ptt2dob, ptt2eth, ptt2sex = {}, {}, {}, {}, {}

# read each of the patient record files one by one
for root, sub_directories, files in os.walk(INPUT_FOLDER_PATH):
for file_name in files:
Expand All @@ -331,23 +325,29 @@ def multiprocess_annotation_records(input_annotations: dict):
_doc2ptt, _ptt2dod, _ptt2age, _ptt2dob, _ptt2eth, _ptt2sex = multiprocess_patient_records(contents)

if _ptt2sex != {}:
dict2json_truncate_add_to_file(_ptt2sex, os.path.join(OUTPUT_FOLDER_PATH, "ptt2sex.json"))
ptt2sex.update(_ptt2sex)
if _ptt2eth != {}:
dict2json_truncate_add_to_file(_ptt2eth, os.path.join(OUTPUT_FOLDER_PATH, "ptt2eth.json"))
ptt2eth.update(_ptt2eth)
if _ptt2dob != {}:
dict2json_truncate_add_to_file(_ptt2dob, os.path.join(OUTPUT_FOLDER_PATH, "ptt2dob.json"))
ptt2dob.update(_ptt2dob)
if _ptt2dod != {}:
dict2json_truncate_add_to_file(_ptt2dod, os.path.join(OUTPUT_FOLDER_PATH, "ptt2dod.json"))
ptt2dod.update(_ptt2dod)
if _ptt2age != {}:
dict2json_truncate_add_to_file(_ptt2age, os.path.join(OUTPUT_FOLDER_PATH, "ptt2age.json"))

ptt2age.update(_ptt2age)
if _doc2ptt != {}:
global_doc2ptt.update(_doc2ptt)

with open(log_file_path, "a+") as log_file:
time = datetime.now()
log_file.write("\n" + str(time) + ": processed file " + str(file_name))

dict2json_file(ptt2sex, os.path.join(OUTPUT_FOLDER_PATH, "ptt2sex.json"))
dict2json_file(ptt2eth, os.path.join(OUTPUT_FOLDER_PATH, "ptt2eth.json"))
dict2json_file(ptt2dob, os.path.join(OUTPUT_FOLDER_PATH, "ptt2dob.json"))
dict2json_file(ptt2dod, os.path.join(OUTPUT_FOLDER_PATH, "ptt2dod.json"))
dict2json_file(ptt2age, os.path.join(OUTPUT_FOLDER_PATH, "ptt2age.json"))


# dump patients for future ref
doc2ptt_path = os.path.join(OUTPUT_FOLDER_PATH, "doc2ptt.json")
if global_doc2ptt != {}:
Expand Down

0 comments on commit 3d0b2e6

Please sign in to comment.