Skip to content

Commit

Permalink
Merge pull request #11 from biothings/tidy_CUI
Browse files Browse the repository at this point in the history
Fix to Issue#10
  • Loading branch information
Yao Yao authored Jan 13, 2023
2 parents 8529056 + cf73d4f commit 81712bf
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 30 deletions.
4 changes: 3 additions & 1 deletion manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
},
"dumper" : {
"data_url" : [
"http://localhost:8080/dataupload/mysrc/semmeddb/semmedVER43_2022_R_PREDICATION_clean_pyarrow_snappy.parquet",
"http://localhost:8080/dataupload/mysrc/semmeddb/semmedVER43_2022_R_PREDICATION.csv",
"http://localhost:8080/dataupload/mysrc/semmeddb/MRCUI.RRF",
"http://localhost:8080/dataupload/mysrc/semmeddb/UMLS_CUI_Semtype.tsv",
Expand All @@ -18,6 +19,7 @@
},
"uploader" : {
"parser" : "parser:load_data",
"on_duplicates" : "error"
"on_duplicates" : "error",
"mapping": "mapping:semmeddb_prediction_mapping"
}
}
66 changes: 66 additions & 0 deletions mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
def semmeddb_prediction_mapping(cls):
mapping = {
"predicate": {
"normalizer": "keyword_lowercase_normalizer",
"type": "keyword"
},
"predication_id": {
"normalizer": "keyword_lowercase_normalizer",
"type": "keyword"
},
"pmid": {
"type": "integer"
},
"subject": {
"properties": {
"umls": {
"normalizer": "keyword_lowercase_normalizer",
"type": "keyword"
},
"name": {
"type": "text"
},
"semantic_type_abbreviation": {
"normalizer": "keyword_lowercase_normalizer",
"type": "keyword"
},
"semantic_type_name": {
"type": "text"
},
"novelty": {
"type": "integer"
},
"ncbigene": {
"normalizer": "keyword_lowercase_normalizer",
"type": "keyword"
}
}
},
"object": {
"properties": {
"umls": {
"normalizer": "keyword_lowercase_normalizer",
"type": "keyword"
},
"name": {
"type": "text"
},
"semantic_type_abbreviation": {
"normalizer": "keyword_lowercase_normalizer",
"type": "keyword"
},
"semantic_type_name": {
"type": "text"
},
"novelty": {
"type": "integer"
},
"ncbigene": {
"normalizer": "keyword_lowercase_normalizer",
"type": "keyword"
}
}
}
}

return mapping
110 changes: 81 additions & 29 deletions parser.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from pathlib import Path
import pickle
import requests
import pandas as pd
Expand All @@ -9,6 +10,20 @@

from biothings.utils.common import iter_n

"""
Constants of filenames
"""
SEMTYPE_MAPPING_FN = "SemanticTypes_2018AB.txt"
UMLS_PREFERRED_CUI_NAME_SEMTYPE_FN = "UMLS_CUI_Semtype.tsv"
MRCUI_FN = "MRCUI.RRF"

SEMMED_TABLE_FN = "semmedVER43_2022_R_PREDICATION.csv"
SEMMED_FN_STEM = Path(SEMMED_TABLE_FN).stem # string of "semmedVER43_2022_R_PREDICATION"
# Path("semmedVER43_2022_R_PREDICATION_clean_pyarrow_snappy.parquet")
SEMMED_TABLE_CACHE_FN = Path(SEMMED_FN_STEM + "_clean_pyarrow_snappy").with_suffix(".parquet")
# Path("semmedVER43_2022_R_PREDICATION_NodeNorm.pickle")
SEMMED_NODE_NORM_RESPONSE_CACHE_FN = Path(SEMMED_FN_STEM + "_NodeNorm").with_suffix(".pickle")


###################################
# PART 1: Load Semantic Type Data #
Expand Down Expand Up @@ -152,6 +167,7 @@ def read_semmed_data_frame(data_folder, filename) -> pd.DataFrame:
filepath = os.path.join(data_folder, filename)
encoding = "latin1" # file may contain chars in other languages (e.g. French)
na_value = r"\N"
escapechar = "\\" # single backslash, see https://github.com/biothings/semmeddb/issues/10
column_info = [
# Each element is a tuple of (column_index, column_name, data_type)
# See column description at https://lhncbc.nlm.nih.gov/ii/tools/SemRep_SemMedDB_SKR/dbinfo.html
Expand All @@ -178,7 +194,7 @@ def read_semmed_data_frame(data_folder, filename) -> pd.DataFrame:
column_names = [e[1] for e in column_info]
column_dtypes = {e[1]: e[2] for e in column_info}
data_frame = pd.read_csv(filepath, sep=",", names=column_names, usecols=column_indices,
dtype=column_dtypes, na_values=[na_value], encoding=encoding)
dtype=column_dtypes, na_values=[na_value], encoding=encoding, escapechar=escapechar)

data_frame = data_frame.astype({
"PREDICATE": "string[pyarrow]",
Expand Down Expand Up @@ -495,23 +511,23 @@ def map_retired_cuis(semmed_data_frame: pd.DataFrame, retirement_mapping_data_fr


def delete_equivalent_ncbigene_ids(semmed_data_frame: pd.DataFrame,
node_normalizer_cache: str = None,
node_normalizer_output: str = None):
node_normalizer_cache_input: str = None,
node_normalizer_cache_output: str = None):
def get_cui_to_gene_id_maps(sub_cui_flags: pd.Series, obj_cui_flags: pd.Series, chunk_size=1000):
sub_cuis = set(semmed_data_frame.loc[sub_cui_flags, "SUBJECT_CUI"].unique())
obj_cuis = set(semmed_data_frame.loc[obj_cui_flags, "OBJECT_CUI"].unique())

if node_normalizer_cache and os.path.exists(node_normalizer_cache):
with open(node_normalizer_cache, 'rb') as handle:
if node_normalizer_cache_input and os.path.exists(node_normalizer_cache_input):
with open(node_normalizer_cache_input, 'rb') as handle:
cui_gene_id_map = pickle.load(handle)
else:
cuis = sub_cuis.union(obj_cuis)
# a <CUI, Gene_ID> map where there the key is a source CUI and the value is its equivalent NCBIGene ID
cui_gene_id_map = query_node_normalizer_for_equivalent_ncbigene_ids(cuis, chunk_size=chunk_size)

# Output to the specified pickle file regardless if it's cache or live response
if node_normalizer_output:
with open(node_normalizer_output, 'wb') as handle:
if node_normalizer_cache_output:
with open(node_normalizer_cache_output, 'wb') as handle:
pickle.dump(cui_gene_id_map, handle, protocol=pickle.HIGHEST_PROTOCOL)

sub_cui_gene_id_map = {cui: gene_id for cui, gene_id in cui_gene_id_map.items() if cui in sub_cuis}
Expand Down Expand Up @@ -567,8 +583,11 @@ def get_row_index_of_equivalent_ncbigene_ids(pred_id_sub_gene_id_map: Dict, pred
pid_obj_gid_map = establish_pred_id_to_gene_id_map(pid_obj_cui_map, obj_cui_gid_map)

dest_equivalent_gid_index = get_row_index_of_equivalent_ncbigene_ids(pid_sub_gid_map, pid_obj_gid_map)

semmed_data_frame.drop(index=dest_equivalent_gid_index, inplace=True)

# Now these two columns are not necessary. Drop them to save memory
semmed_data_frame.drop(columns=["IS_SUBJECT_PIPED", "IS_OBJECT_PIPED"], inplace=True)

return semmed_data_frame


Expand All @@ -591,6 +610,28 @@ def add_document_id_column(semmed_data_frame: pd.DataFrame):
return semmed_data_frame


def write_parquet_cache(semmed_data_frame: pd.DataFrame, path: str):
# Option description see https://pandas.pydata.org/pandas-docs/version/1.1/reference/api/pandas.DataFrame.to_parquet.html
engine = "pyarrow"
compression = "snappy"

semmed_data_frame.to_parquet(path=path, index=False, engine=engine, compression=compression)


def read_parquet_cache(path: str) -> pd.DataFrame:
# Option description see https://pandas.pydata.org/pandas-docs/version/1.1/reference/api/pandas.DataFrame.to_parquet.html
engine = "pyarrow"
semmed_data_frame = pd.read_parquet(path=path, engine=engine)

string_columns = ["_ID", "PREDICATE", "SUBJECT_CUI", "SUBJECT_NAME", "SUBJECT_SEMTYPE", "OBJECT_CUI", "OBJECT_NAME", "OBJECT_SEMTYPE"]
existing_string_columns = [col for col in string_columns if col in semmed_data_frame.columns]
dtype_map = {col: "string[pyarrow]" for col in existing_string_columns}

semmed_data_frame = semmed_data_frame.astype(dtype=dtype_map, copy=False)

return semmed_data_frame


##################################
# PART 5: Node Normalizer Client #
##################################
Expand Down Expand Up @@ -692,33 +733,44 @@ def construct_document(row: pd.Series, semantic_type_map):
if not doc["object"]["semantic_type_name"]:
del doc["object"]["semantic_type_name"]

yield doc
return doc


def load_data(data_folder):
semmed_df = read_semmed_data_frame(data_folder, "semmedVER43_2022_R_PREDICATION.csv")
semmed_df = delete_zero_novelty_scores(semmed_df)
semmed_df = delete_invalid_object_cuis(semmed_df)
semmed_df = explode_pipes(semmed_df)
def load_data(data_folder, write_semmed_cache=False):
semmed_cache_path = os.path.join(data_folder, SEMMED_TABLE_CACHE_FN)

mrcui_df = read_mrcui_data_frame(data_folder, "MRCUI.RRF")
deleted_cuis = get_retired_cuis_for_deletion(mrcui_df)
semmed_df = delete_retired_cuis(semmed_df, deleted_cuis)
# Always read the cache if available
if semmed_cache_path and os.path.exists(semmed_cache_path):
semmed_df = read_parquet_cache(path=semmed_cache_path)
else:
# Start the data cleaning procedure if cache not available
semmed_df = read_semmed_data_frame(data_folder, SEMMED_TABLE_FN)
semmed_df = delete_zero_novelty_scores(semmed_df)
semmed_df = delete_invalid_object_cuis(semmed_df)
semmed_df = explode_pipes(semmed_df)

semmed_df = add_prefix_columns(semmed_df)
semmed_cui_name_semtype_df = get_cui_name_and_semtype_from_semmed(semmed_df)
umls_cui_name_semtype_df = read_cui_name_and_semtype_from_umls(data_folder, "UMLS_CUI_Semtype.tsv")
retirement_mapping_df = get_retirement_mapping_data_frame(mrcui_df)
retirement_mapping_df = add_cui_name_and_semtype_to_retirement_mapping(retirement_mapping_df, semmed_cui_name_semtype_df, umls_cui_name_semtype_df)
semmed_df = map_retired_cuis(semmed_df, retirement_mapping_df)
mrcui_df = read_mrcui_data_frame(data_folder, MRCUI_FN)
deleted_cuis = get_retired_cuis_for_deletion(mrcui_df)
semmed_df = delete_retired_cuis(semmed_df, deleted_cuis)

node_normalizer_cache = os.path.join(data_folder, "semmedVER43_2022_R_PREDICATION_NodeNorm.pickle")
semmed_df = delete_equivalent_ncbigene_ids(semmed_df, node_normalizer_cache=node_normalizer_cache, node_normalizer_output=None)
semmed_df = add_prefix_columns(semmed_df)
semmed_cui_name_semtype_df = get_cui_name_and_semtype_from_semmed(semmed_df)
umls_cui_name_semtype_df = read_cui_name_and_semtype_from_umls(data_folder, UMLS_PREFERRED_CUI_NAME_SEMTYPE_FN) # pre-generated file; see README.md
retirement_mapping_df = get_retirement_mapping_data_frame(mrcui_df)
retirement_mapping_df = add_cui_name_and_semtype_to_retirement_mapping(retirement_mapping_df, semmed_cui_name_semtype_df, umls_cui_name_semtype_df)
semmed_df = map_retired_cuis(semmed_df, retirement_mapping_df)

semmed_df = add_document_id_column(semmed_df)
node_normalizer_cache = os.path.join(data_folder, SEMMED_NODE_NORM_RESPONSE_CACHE_FN)
semmed_df = delete_equivalent_ncbigene_ids(semmed_df, node_normalizer_cache_input=node_normalizer_cache, node_normalizer_cache_output=None)

semtype_mappings_df = read_semantic_type_mappings_data_frame(data_folder, "SemanticTypes_2018AB.txt")
semtype_name_map = get_semtype_name_map(semtype_mappings_df)
semmed_df = add_document_id_column(semmed_df)

# Write cache only when `write_semmed_cache` is set
if write_semmed_cache:
write_parquet_cache(semmed_df, path=semmed_cache_path)

semtype_mappings_df = read_semantic_type_mappings_data_frame(data_folder, SEMTYPE_MAPPING_FN)
semtype_name_map = get_semtype_name_map(semtype_mappings_df)
for _, row in semmed_df.iterrows():
yield from construct_document(row, semtype_name_map)
doc = construct_document(row, semtype_name_map)
yield doc

0 comments on commit 81712bf

Please sign in to comment.