From f597fa8aa68042831eaf82f2d20e66dd6a3eadbb Mon Sep 17 00:00:00 2001 From: Tessa Pierce Ward Date: Mon, 13 Feb 2023 08:01:28 -0800 Subject: [PATCH] MRG: Use RankLineageInfo to simplify reading lineages (#2467) --- src/sourmash/tax/tax_utils.py | 118 ++++++++++++++++--------------- tests/test_tax_utils.py | 126 ++++++++++++++++++++-------------- 2 files changed, 134 insertions(+), 110 deletions(-) diff --git a/src/sourmash/tax/tax_utils.py b/src/sourmash/tax/tax_utils.py index f108249cba..844fd1ceef 100644 --- a/src/sourmash/tax/tax_utils.py +++ b/src/sourmash/tax/tax_utils.py @@ -51,7 +51,6 @@ class BaseLineageInfo: optional: lineage: tuple or list of LineagePair lineage_str: `;`- or `,`-separated string of names - lineage_dict: dictionary of {rank: name} If no lineage information is provided, result will be a BaseLineageInfo with provided ranks and no lineage names. @@ -63,7 +62,6 @@ class BaseLineageInfo: ranks: tuple() # require ranks lineage: tuple = None # tuple of LineagePairs lineage_str: str = field(default=None, compare=False) # ';'- or ','-separated str of lineage names - lineage_dict: dict = field(default=None, compare=False) # dict of rank: name def __post_init__(self): "Initialize according to passed values" @@ -74,8 +72,6 @@ def __post_init__(self): self._init_from_lineage_tuples() elif self.lineage_str is not None: self._init_from_lineage_str() - elif self.lineage_dict is not None: - self._init_from_lineage_dict() else: self._init_empty() @@ -170,37 +166,6 @@ def _init_from_lineage_tuples(self): object.__setattr__(self, "lineage", tuple(new_lineage)) object.__setattr__(self, "filled_ranks", filled_ranks) - def _init_from_lineage_dict(self): - 'initialize from lineage dict, e.g. from gather csv, allowing empty ranks and reordering if necessary' - if not isinstance(self.lineage_dict, (dict)): - raise ValueError(f"{self.lineage_dict} is not dictionary") - # first, initialize_empty - new_lineage = [] - # build empty lineage - for rank in self.ranks: - new_lineage.append(LineagePair(rank=rank)) - # now add input information in correct spots. This corrects for order and allows empty values. - for rank, info in self.lineage_dict.items(): - try: - rank_idx = self.rank_index(rank) - except ValueError as e: - raise ValueError(f"Rank '{rank}' not present in {', '.join(self.ranks)}") from e - - name, taxid = None, None - if isinstance(info, dict): - if 'name' in info.keys(): - name = info['name'] - if 'taxid' in info.keys(): - taxid = info['taxid'] - elif isinstance(info, str): - name = info - new_lineage[rank_idx] = LineagePair(rank=rank, name=name, taxid=taxid) - # build list of filled ranks - filled_ranks = [a.rank for a in new_lineage if a.name] - # set lineage and filled_ranks - object.__setattr__(self, "lineage", tuple(new_lineage)) - object.__setattr__(self, "filled_ranks", filled_ranks) - def _init_from_lineage_str(self): """ Turn a ; or ,-separated set of lineages into a list of LineagePair objs. @@ -329,6 +294,7 @@ class RankLineageInfo(BaseLineageInfo): and will not be used or compared in any other class methods. """ ranks: tuple = ('superkingdom', 'phylum', 'class', 'order', 'family', 'genus', 'species', 'strain') + lineage_dict: dict = field(default=None, compare=False) # dict of rank: name def __post_init__(self): "Initialize according to passed values" @@ -344,6 +310,53 @@ def __post_init__(self): elif self.ranks: self._init_empty() + def _init_from_lineage_dict(self): + """ + Initialize from lineage dict, e.g. from lineages csv. + Use NCBI taxids if available as '|'-separated 'taxpath' column. + Allows empty ranks/extra columns and reordering if necessary + """ + null_names = set(['[Blank]', 'na', 'null', 'NA', '']) + if not isinstance(self.lineage_dict, (dict)): + raise ValueError(f"{self.lineage_dict} is not dictionary") + new_lineage = [] + taxpath=[] + # build empty lineage and taxpath + for rank in self.ranks: + new_lineage.append(LineagePair(rank=rank)) + + # check for NCBI taxpath information + taxpath_str = self.lineage_dict.get('taxpath', []) + if taxpath_str: + taxpath = taxpath_str.split('|') + if len(taxpath) > len(self.ranks): + raise ValueError(f"Number of NCBI taxids ({len(taxpath)}) exceeds number of ranks ({len(self.ranks)})") + + # now add rank information in correct spots. This corrects for order and allows empty ranks and extra dict keys + for key, val in self.lineage_dict.items(): + name, taxid = None, None + try: + rank, name = key, val + rank_idx = self.rank_index(rank) + except ValueError: + continue # ignore dictionary entries (columns) that don't match a rank + + if taxpath: + try: + taxid = taxpath[rank_idx] + except IndexError: + taxid = None + # filter null + if name is not None and name.strip() in null_names: + name = None + new_lineage[rank_idx] = LineagePair(rank=rank, name=name, taxid=taxid) + + # build list of filled ranks + filled_ranks = [a.rank for a in new_lineage if a.name] + # set lineage and filled_ranks + object.__setattr__(self, "lineage", tuple(new_lineage)) + object.__setattr__(self, "filled_ranks", filled_ranks) + def get_ident(ident, *, keep_full_identifiers=False, keep_identifier_versions=False): @@ -754,15 +767,14 @@ def load(cls, filename, *, delimiter=',', force=False, else: header_str = ",".join([repr(x) for x in header]) raise ValueError(f'No taxonomic identifiers found; headers are {header_str}') + # is "strain" an available rank? if "strain" in header: include_strain=True - load_taxids=False - if 'taxpath' in header: - load_taxids=True - # check that all ranks are in header - ranks = list(lca_utils.taxlist(include_strain=include_strain)) + ranks = list(RankLineageInfo().taxlist) + if not include_strain: + ranks.remove('strain') if not set(ranks).issubset(header): # for now, just raise err if not all ranks are present. # in future, we can define `ranks` differently if desired @@ -777,16 +789,9 @@ def load(cls, filename, *, delimiter=',', force=False, # now parse and load lineages for n, row in enumerate(r): num_rows += 1 - lineage = [] - taxid=None - # read row into a lineage pair - if load_taxids: - taxpath = row['taxpath'].split('|') - for n, rank in enumerate(lca_utils.taxlist(include_strain=include_strain)): - lin = row[rank] - if load_taxids: - taxid = taxpath[n] - lineage.append(LineagePair(rank, name=lin, taxid=taxid)) + # read lineage from row dictionary + lineageInfo = RankLineageInfo(lineage_dict=row) + # get identifier ident = row[identifier] # fold, spindle, and mutilate ident? @@ -794,23 +799,16 @@ def load(cls, filename, *, delimiter=',', force=False, keep_full_identifiers=keep_full_identifiers, keep_identifier_versions=keep_identifier_versions) - # clean lineage of null names, replace with 'unassigned' - lineage = [ (lin.rank, lca_utils.filter_null(lin.name), lin.taxid) for lin in lineage ] - lineage = [ LineagePair(a, b, c) for (a, b, c) in lineage ] - - # remove end nulls - while lineage and lineage[-1].name == 'unassigned': - lineage = lineage[:-1] - # store lineage tuple + lineage = lineageInfo.filled_lineage if lineage: # check duplicates if ident in assignments: - if assignments[ident] != tuple(lineage): + if assignments[ident] != lineage: if not force: raise ValueError(f"multiple lineages for identifier {ident}") else: - assignments[ident] = tuple(lineage) + assignments[ident] = lineage if lineage[-1].rank == 'species': n_species += 1 diff --git a/tests/test_tax_utils.py b/tests/test_tax_utils.py index fe0b00e1c3..db4a277363 100644 --- a/tests/test_tax_utils.py +++ b/tests/test_tax_utils.py @@ -1074,46 +1074,6 @@ def test_BaseLineageInfo_init_lca_lineage_tups(): assert taxinf.zip_lineage()== ['a', '', 'b'] -def test_BaseLineageInfo_init_lineage_dict_fail(): - ranks=["A", "B", "C"] - lin_tups = (LineagePair(rank="A", name='a'), LineagePair(rank="C", name='b')) - with pytest.raises(ValueError) as exc: - taxinf = BaseLineageInfo(ranks=ranks, lineage_dict=lin_tups) - print(str(exc)) - - assert "is not dictionary" in str(exc) - - -def test_BaseLineageInfo_init_lineage_dict (): - x = {'rank1': 'name1', 'rank2': 'name2'} - taxinf = BaseLineageInfo(lineage_dict=x, ranks=["rank1", "rank2"]) - print("ranks: ", taxinf.ranks) - print("lineage: ", taxinf.lineage) - print("zipped lineage: ", taxinf.zip_lineage()) - assert taxinf.zip_lineage()== ['name1', 'name2'] - - -def test_BaseLineageInfo_init_lineage_dict_withtaxid(): - x = {'rank1': {'name': 'name1', 'taxid': 1}, 'rank2': {'name':'name2', 'taxid': 2}} - taxinf = BaseLineageInfo(lineage_dict=x, ranks=["rank1", "rank2"]) - print("ranks: ", taxinf.ranks) - print("lineage: ", taxinf.lineage) - print("zipped lineage: ", taxinf.zip_lineage()) - assert taxinf.zip_lineage()== ['name1', 'name2'] - assert taxinf.zip_taxid()== ['1', '2'] - assert taxinf.lowest_lineage_taxid == 2 - assert taxinf.lowest_lineage_name == "name2" - - -def test_BaseLineageInfo_init_lineage_str_lineage_dict_test_eq(): - x = "a;b;c" - ranks=["A", "B", "C"] - rankD = {"A": "a", "B": "b", "C": "c"} - lin1 = BaseLineageInfo(lineage_str=x, ranks=ranks) - lin2 = BaseLineageInfo(lineage_dict=rankD, ranks=ranks) - assert lin1 == lin2 - - def test_BaseLineageInfo_init_no_ranks(): x = "a;b;c" rankD = {"superkingdom": "a", "phylum": "b", "class": "c"} @@ -1122,10 +1082,6 @@ def test_BaseLineageInfo_init_no_ranks(): BaseLineageInfo(lineage_str=x) print(exc) assert "__init__() missing 1 required positional argument: 'ranks'" in str(exc) - with pytest.raises(TypeError) as exc: - BaseLineageInfo(lineage_dict=rankD) - print(exc) - assert "__init__() missing 1 required positional argument: 'ranks'" in str(exc) with pytest.raises(TypeError) as exc: BaseLineageInfo(lineage=lin_tups) print(exc) @@ -1140,10 +1096,6 @@ def test_BaseLineageInfo_init_with_wrong_ranks(): BaseLineageInfo(lineage=lin_tups, ranks=ranks) print(str(exc)) assert "Rank 'rank1' not present in A, B, C" in str(exc) - with pytest.raises(ValueError) as exc: - BaseLineageInfo(lineage_dict=linD, ranks=ranks) - print(str(exc)) - assert "Rank 'rank1' not present in A, B, C" in str(exc) def test_BaseLineageInfo_init_not_lineagepair(): @@ -1187,7 +1139,26 @@ def test_RankLineageInfo_init_lineage_tups(): assert taxinf.zip_lineage()== ['a', 'b', '', '', '', '', '', ''] +def test_RankLineageInfo_init_lineage_dict_fail(): + ranks=["A", "B", "C"] + lin_tups = (LineagePair(rank="A", name='a'), LineagePair(rank="C", name='b')) + with pytest.raises(ValueError) as exc: + taxinf = RankLineageInfo(ranks=ranks, lineage_dict=lin_tups) + print(str(exc)) + + assert "is not dictionary" in str(exc) + + def test_RankLineageInfo_init_lineage_dict(): + x = {'rank1': 'name1', 'rank2': 'name2'} + taxinf = RankLineageInfo(lineage_dict=x, ranks=["rank1", "rank2"]) + print("ranks: ", taxinf.ranks) + print("lineage: ", taxinf.lineage) + print("zipped lineage: ", taxinf.zip_lineage()) + assert taxinf.zip_lineage()== ['name1', 'name2'] + + +def test_RankLineageInfo_init_lineage_dict_default_ranks(): x = {"superkingdom":'a',"phylum":'b'} taxinf = RankLineageInfo(lineage_dict=x) print(taxinf.lineage) @@ -1195,6 +1166,28 @@ def test_RankLineageInfo_init_lineage_dict(): assert taxinf.zip_lineage()== ['a', 'b', '', '', '', '', '', ''] +def test_RankLineageInfo_init_lineage_dict_withtaxpath(): + x = {'rank1': 'name1', 'rank2': 'name2', 'taxpath': "1|2"} + taxinf = RankLineageInfo(lineage_dict=x, ranks=["rank1", "rank2"]) + print("ranks: ", taxinf.ranks) + print("lineage: ", taxinf.lineage) + print("zipped lineage: ", taxinf.zip_lineage()) + print("zipped taxids: ", taxinf.zip_taxid()) + assert taxinf.zip_lineage()== ['name1', 'name2'] + assert taxinf.zip_taxid()== ['1', '2'] + assert taxinf.lowest_lineage_taxid == "2" + assert taxinf.lowest_lineage_name == "name2" + + +def test_RankLineageInfo_init_lineage_str_lineage_dict_test_eq(): + x = "a;b;c" + ranks=["A", "B", "C"] + rankD = {"A": "a", "B": "b", "C": "c"} + lin1 = RankLineageInfo(lineage_str=x, ranks=ranks) + lin2 = RankLineageInfo(lineage_dict=rankD, ranks=ranks) + assert lin1 == lin2 + + def test_RankLineageInfo_init_lineage_dict_missing_rank(): x = {'superkingdom': 'name1', 'class': 'name2'} taxinf = RankLineageInfo(lineage_dict=x) @@ -1205,8 +1198,8 @@ def test_RankLineageInfo_init_lineage_dict_missing_rank(): assert taxinf.zip_lineage(truncate_empty=True)== ['name1', '', 'name2'] -def test_RankLineageInfo_init_lineage_dict_missing_rank_withtaxid(): - x = {'superkingdom': {'name': 'name1', 'taxid': 1}, 'class': {'name':'name2', 'taxid': 2}} +def test_RankLineageInfo_init_lineage_dict_missing_rank_with_taxpath(): + x = {'superkingdom': 'name1', 'class': 'name2', 'taxpath': '1||2'} taxinf = RankLineageInfo(lineage_dict=x) print("ranks: ", taxinf.ranks) print("lineage: ", taxinf.lineage) @@ -1215,6 +1208,39 @@ def test_RankLineageInfo_init_lineage_dict_missing_rank_withtaxid(): assert taxinf.zip_taxid()== ['1', '', '2', '', '', '', '', ''] +def test_RankLineageInfo_init_lineage_dict_name_taxpath_mismatch(): + # If there's no name, we don't report the taxpath, because lineage is not "filled". + # Is this desired behavior? + x = {'superkingdom': 'name1', 'taxpath': '1||2'} + taxinf = RankLineageInfo(lineage_dict=x) + print("ranks: ", taxinf.ranks) + print("lineage: ", taxinf.lineage) + print("zipped lineage: ", taxinf.zip_lineage()) + assert taxinf.zip_lineage()== ['name1', '', '', '', '', '', '', ''] + assert taxinf.zip_taxid()== ['1', '', '', '', '', '', '', ''] + + +def test_RankLineageInfo_init_lineage_dict_name_taxpath_missing_taxids(): + # If there's no name, we don't report the taxpath, because lineage is not "filled". + # Is this desired behavior? + x = {'superkingdom': 'name1', 'phylum': "name2", "class": "name3", 'taxpath': '|2'} + taxinf = RankLineageInfo(lineage_dict=x) + print("ranks: ", taxinf.ranks) + print("lineage: ", taxinf.lineage) + print("zipped lineage: ", taxinf.zip_lineage()) + print("zipped taxids: ", taxinf.zip_taxid()) + assert taxinf.zip_lineage()== ['name1', 'name2', 'name3', '', '', '', '', ''] + assert taxinf.zip_taxid()== ['', '2', '', '', '', '', '', ''] + + +def test_RankLineageInfo_init_lineage_dict_taxpath_too_long(): + x = {'superkingdom': 'name1', 'class': 'name2', 'taxpath': '1||2||||||||||'} + with pytest.raises(ValueError) as exc: + RankLineageInfo(lineage_dict=x) + print(str(exc)) + assert f"Number of NCBI taxids (13) exceeds number of ranks (8)" in str(exc) + + def test_RankLineageInfo_init_lineage_str_lineage_dict_test_eq(): x = "a;b;c" rankD = {"superkingdom": "a", "phylum": "b", "class": "c"}