From d1d72e13986b00452b013f029867b9688adeae5c Mon Sep 17 00:00:00 2001 From: Yao Yao Date: Thu, 12 Jan 2023 16:50:28 -0800 Subject: [PATCH 1/2] add hardcoded mapping; add parquet cache file to dumper --- manifest.json | 4 +++- mapping.py | 66 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 mapping.py diff --git a/manifest.json b/manifest.json index 9b65a74..ee6bfd0 100644 --- a/manifest.json +++ b/manifest.json @@ -7,6 +7,7 @@ }, "dumper" : { "data_url" : [ + "http://localhost:8080/dataupload/mysrc/semmeddb/semmedVER43_2022_R_PREDICATION_clean_pyarrow_snappy.parquet", "http://localhost:8080/dataupload/mysrc/semmeddb/semmedVER43_2022_R_PREDICATION.csv", "http://localhost:8080/dataupload/mysrc/semmeddb/MRCUI.RRF", "http://localhost:8080/dataupload/mysrc/semmeddb/UMLS_CUI_Semtype.tsv", @@ -18,6 +19,7 @@ }, "uploader" : { "parser" : "parser:load_data", - "on_duplicates" : "error" + "on_duplicates" : "error", + "mapping": "mapping:semmeddb_prediction_mapping" } } diff --git a/mapping.py b/mapping.py new file mode 100644 index 0000000..d1534b6 --- /dev/null +++ b/mapping.py @@ -0,0 +1,66 @@ +def semmeddb_prediction_mapping(cls): + mapping = { + "predicate": { + "normalizer": "keyword_lowercase_normalizer", + "type": "keyword" + }, + "predication_id": { + "normalizer": "keyword_lowercase_normalizer", + "type": "keyword" + }, + "pmid": { + "type": "integer" + }, + "subject": { + "properties": { + "umls": { + "normalizer": "keyword_lowercase_normalizer", + "type": "keyword" + }, + "name": { + "type": "text" + }, + "semantic_type_abbreviation": { + "normalizer": "keyword_lowercase_normalizer", + "type": "keyword" + }, + "semantic_type_name": { + "type": "text" + }, + "novelty": { + "type": "integer" + }, + "ncbigene": { + "normalizer": "keyword_lowercase_normalizer", + "type": "keyword" + } + } + }, + "object": { + "properties": { + "umls": { + "normalizer": "keyword_lowercase_normalizer", + "type": "keyword" + }, + "name": { + "type": "text" + }, + "semantic_type_abbreviation": { + "normalizer": "keyword_lowercase_normalizer", + "type": "keyword" + }, + "semantic_type_name": { + "type": "text" + }, + "novelty": { + "type": "integer" + }, + "ncbigene": { + "normalizer": "keyword_lowercase_normalizer", + "type": "keyword" + } + } + } + } + + return mapping From cf73d4fc4087c9f28dcbbcba38943a35566e8a27 Mon Sep 17 00:00:00 2001 From: Yao Yao Date: Thu, 12 Jan 2023 16:52:05 -0800 Subject: [PATCH 2/2] fix Issue#10; read parquet cache when possible; extract constants for filenames --- parser.py | 110 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 81 insertions(+), 29 deletions(-) diff --git a/parser.py b/parser.py index 2a08c42..28cd0ff 100644 --- a/parser.py +++ b/parser.py @@ -1,4 +1,5 @@ import os +from pathlib import Path import pickle import requests import pandas as pd @@ -9,6 +10,20 @@ from biothings.utils.common import iter_n +""" +Constants of filenames +""" +SEMTYPE_MAPPING_FN = "SemanticTypes_2018AB.txt" +UMLS_PREFERRED_CUI_NAME_SEMTYPE_FN = "UMLS_CUI_Semtype.tsv" +MRCUI_FN = "MRCUI.RRF" + +SEMMED_TABLE_FN = "semmedVER43_2022_R_PREDICATION.csv" +SEMMED_FN_STEM = Path(SEMMED_TABLE_FN).stem # string of "semmedVER43_2022_R_PREDICATION" +# Path("semmedVER43_2022_R_PREDICATION_clean_pyarrow_snappy.parquet") +SEMMED_TABLE_CACHE_FN = Path(SEMMED_FN_STEM + "_clean_pyarrow_snappy").with_suffix(".parquet") +# Path("semmedVER43_2022_R_PREDICATION_NodeNorm.pickle") +SEMMED_NODE_NORM_RESPONSE_CACHE_FN = Path(SEMMED_FN_STEM + "_NodeNorm").with_suffix(".pickle") + ################################### # PART 1: Load Semantic Type Data # @@ -152,6 +167,7 @@ def read_semmed_data_frame(data_folder, filename) -> pd.DataFrame: filepath = os.path.join(data_folder, filename) encoding = "latin1" # file may contain chars in other languages (e.g. French) na_value = r"\N" + escapechar = "\\" # single backslash, see https://github.com/biothings/semmeddb/issues/10 column_info = [ # Each element is a tuple of (column_index, column_name, data_type) # See column description at https://lhncbc.nlm.nih.gov/ii/tools/SemRep_SemMedDB_SKR/dbinfo.html @@ -178,7 +194,7 @@ def read_semmed_data_frame(data_folder, filename) -> pd.DataFrame: column_names = [e[1] for e in column_info] column_dtypes = {e[1]: e[2] for e in column_info} data_frame = pd.read_csv(filepath, sep=",", names=column_names, usecols=column_indices, - dtype=column_dtypes, na_values=[na_value], encoding=encoding) + dtype=column_dtypes, na_values=[na_value], encoding=encoding, escapechar=escapechar) data_frame = data_frame.astype({ "PREDICATE": "string[pyarrow]", @@ -495,14 +511,14 @@ def map_retired_cuis(semmed_data_frame: pd.DataFrame, retirement_mapping_data_fr def delete_equivalent_ncbigene_ids(semmed_data_frame: pd.DataFrame, - node_normalizer_cache: str = None, - node_normalizer_output: str = None): + node_normalizer_cache_input: str = None, + node_normalizer_cache_output: str = None): def get_cui_to_gene_id_maps(sub_cui_flags: pd.Series, obj_cui_flags: pd.Series, chunk_size=1000): sub_cuis = set(semmed_data_frame.loc[sub_cui_flags, "SUBJECT_CUI"].unique()) obj_cuis = set(semmed_data_frame.loc[obj_cui_flags, "OBJECT_CUI"].unique()) - if node_normalizer_cache and os.path.exists(node_normalizer_cache): - with open(node_normalizer_cache, 'rb') as handle: + if node_normalizer_cache_input and os.path.exists(node_normalizer_cache_input): + with open(node_normalizer_cache_input, 'rb') as handle: cui_gene_id_map = pickle.load(handle) else: cuis = sub_cuis.union(obj_cuis) @@ -510,8 +526,8 @@ def get_cui_to_gene_id_maps(sub_cui_flags: pd.Series, obj_cui_flags: pd.Series, cui_gene_id_map = query_node_normalizer_for_equivalent_ncbigene_ids(cuis, chunk_size=chunk_size) # Output to the specified pickle file regardless if it's cache or live response - if node_normalizer_output: - with open(node_normalizer_output, 'wb') as handle: + if node_normalizer_cache_output: + with open(node_normalizer_cache_output, 'wb') as handle: pickle.dump(cui_gene_id_map, handle, protocol=pickle.HIGHEST_PROTOCOL) sub_cui_gene_id_map = {cui: gene_id for cui, gene_id in cui_gene_id_map.items() if cui in sub_cuis} @@ -567,8 +583,11 @@ def get_row_index_of_equivalent_ncbigene_ids(pred_id_sub_gene_id_map: Dict, pred pid_obj_gid_map = establish_pred_id_to_gene_id_map(pid_obj_cui_map, obj_cui_gid_map) dest_equivalent_gid_index = get_row_index_of_equivalent_ncbigene_ids(pid_sub_gid_map, pid_obj_gid_map) - semmed_data_frame.drop(index=dest_equivalent_gid_index, inplace=True) + + # Now these two columns are not necessary. Drop them to save memory + semmed_data_frame.drop(columns=["IS_SUBJECT_PIPED", "IS_OBJECT_PIPED"], inplace=True) + return semmed_data_frame @@ -591,6 +610,28 @@ def add_document_id_column(semmed_data_frame: pd.DataFrame): return semmed_data_frame +def write_parquet_cache(semmed_data_frame: pd.DataFrame, path: str): + # Option description see https://pandas.pydata.org/pandas-docs/version/1.1/reference/api/pandas.DataFrame.to_parquet.html + engine = "pyarrow" + compression = "snappy" + + semmed_data_frame.to_parquet(path=path, index=False, engine=engine, compression=compression) + + +def read_parquet_cache(path: str) -> pd.DataFrame: + # Option description see https://pandas.pydata.org/pandas-docs/version/1.1/reference/api/pandas.DataFrame.to_parquet.html + engine = "pyarrow" + semmed_data_frame = pd.read_parquet(path=path, engine=engine) + + string_columns = ["_ID", "PREDICATE", "SUBJECT_CUI", "SUBJECT_NAME", "SUBJECT_SEMTYPE", "OBJECT_CUI", "OBJECT_NAME", "OBJECT_SEMTYPE"] + existing_string_columns = [col for col in string_columns if col in semmed_data_frame.columns] + dtype_map = {col: "string[pyarrow]" for col in existing_string_columns} + + semmed_data_frame = semmed_data_frame.astype(dtype=dtype_map, copy=False) + + return semmed_data_frame + + ################################## # PART 5: Node Normalizer Client # ################################## @@ -692,33 +733,44 @@ def construct_document(row: pd.Series, semantic_type_map): if not doc["object"]["semantic_type_name"]: del doc["object"]["semantic_type_name"] - yield doc + return doc -def load_data(data_folder): - semmed_df = read_semmed_data_frame(data_folder, "semmedVER43_2022_R_PREDICATION.csv") - semmed_df = delete_zero_novelty_scores(semmed_df) - semmed_df = delete_invalid_object_cuis(semmed_df) - semmed_df = explode_pipes(semmed_df) +def load_data(data_folder, write_semmed_cache=False): + semmed_cache_path = os.path.join(data_folder, SEMMED_TABLE_CACHE_FN) - mrcui_df = read_mrcui_data_frame(data_folder, "MRCUI.RRF") - deleted_cuis = get_retired_cuis_for_deletion(mrcui_df) - semmed_df = delete_retired_cuis(semmed_df, deleted_cuis) + # Always read the cache if available + if semmed_cache_path and os.path.exists(semmed_cache_path): + semmed_df = read_parquet_cache(path=semmed_cache_path) + else: + # Start the data cleaning procedure if cache not available + semmed_df = read_semmed_data_frame(data_folder, SEMMED_TABLE_FN) + semmed_df = delete_zero_novelty_scores(semmed_df) + semmed_df = delete_invalid_object_cuis(semmed_df) + semmed_df = explode_pipes(semmed_df) - semmed_df = add_prefix_columns(semmed_df) - semmed_cui_name_semtype_df = get_cui_name_and_semtype_from_semmed(semmed_df) - umls_cui_name_semtype_df = read_cui_name_and_semtype_from_umls(data_folder, "UMLS_CUI_Semtype.tsv") - retirement_mapping_df = get_retirement_mapping_data_frame(mrcui_df) - retirement_mapping_df = add_cui_name_and_semtype_to_retirement_mapping(retirement_mapping_df, semmed_cui_name_semtype_df, umls_cui_name_semtype_df) - semmed_df = map_retired_cuis(semmed_df, retirement_mapping_df) + mrcui_df = read_mrcui_data_frame(data_folder, MRCUI_FN) + deleted_cuis = get_retired_cuis_for_deletion(mrcui_df) + semmed_df = delete_retired_cuis(semmed_df, deleted_cuis) - node_normalizer_cache = os.path.join(data_folder, "semmedVER43_2022_R_PREDICATION_NodeNorm.pickle") - semmed_df = delete_equivalent_ncbigene_ids(semmed_df, node_normalizer_cache=node_normalizer_cache, node_normalizer_output=None) + semmed_df = add_prefix_columns(semmed_df) + semmed_cui_name_semtype_df = get_cui_name_and_semtype_from_semmed(semmed_df) + umls_cui_name_semtype_df = read_cui_name_and_semtype_from_umls(data_folder, UMLS_PREFERRED_CUI_NAME_SEMTYPE_FN) # pre-generated file; see README.md + retirement_mapping_df = get_retirement_mapping_data_frame(mrcui_df) + retirement_mapping_df = add_cui_name_and_semtype_to_retirement_mapping(retirement_mapping_df, semmed_cui_name_semtype_df, umls_cui_name_semtype_df) + semmed_df = map_retired_cuis(semmed_df, retirement_mapping_df) - semmed_df = add_document_id_column(semmed_df) + node_normalizer_cache = os.path.join(data_folder, SEMMED_NODE_NORM_RESPONSE_CACHE_FN) + semmed_df = delete_equivalent_ncbigene_ids(semmed_df, node_normalizer_cache_input=node_normalizer_cache, node_normalizer_cache_output=None) - semtype_mappings_df = read_semantic_type_mappings_data_frame(data_folder, "SemanticTypes_2018AB.txt") - semtype_name_map = get_semtype_name_map(semtype_mappings_df) + semmed_df = add_document_id_column(semmed_df) + # Write cache only when `write_semmed_cache` is set + if write_semmed_cache: + write_parquet_cache(semmed_df, path=semmed_cache_path) + + semtype_mappings_df = read_semantic_type_mappings_data_frame(data_folder, SEMTYPE_MAPPING_FN) + semtype_name_map = get_semtype_name_map(semtype_mappings_df) for _, row in semmed_df.iterrows(): - yield from construct_document(row, semtype_name_map) + doc = construct_document(row, semtype_name_map) + yield doc