Skip to content

Commit

Permalink
Add --order-by support to query-datasets command-line
Browse files Browse the repository at this point in the history
Do not include it in butler associate.
This required a small rewrite of the table accumulator to use
a dict rather than a set.
  • Loading branch information
timj committed Sep 4, 2024
1 parent c324dac commit afba401
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 31 deletions.
2 changes: 1 addition & 1 deletion python/lsst/daf/butler/cli/cmd/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
@click.command(cls=ButlerCommand, short_help="Add existing datasets to a tagged collection.")
@repo_argument(required=True)
@collection_argument(help="COLLECTION is the collection the datasets should be associated with.")
@query_datasets_options(repo=False, showUri=False, useArguments=False, default_limit=0)
@query_datasets_options(repo=False, showUri=False, useArguments=False, default_limit=0, use_order_by=False)
@options_file_option()
def associate(**kwargs: Any) -> None:
"""Add existing datasets to a tagged collection; searches for datasets with
Expand Down
13 changes: 11 additions & 2 deletions python/lsst/daf/butler/cli/opt/optionGroups.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

from ..utils import OptionGroup, unwrap, where_help
from .arguments import glob_argument, repo_argument
from .options import collections_option, dataset_type_option, limit_option, where_option
from .options import collections_option, dataset_type_option, limit_option, order_by_option, where_option


class query_datasets_options(OptionGroup): # noqa: N801
Expand All @@ -49,10 +49,17 @@ class query_datasets_options(OptionGroup): # noqa: N801
Whether this is an argument or an option.
default_limit : `int`
The default value to use for the limit parameter.
use_order_by : `bool`
Whether to include an order_by option.
"""

def __init__(
self, repo: bool = True, showUri: bool = True, useArguments: bool = True, default_limit: int = -10_000
self,
repo: bool = True,
showUri: bool = True,
useArguments: bool = True,
default_limit: int = -10_000,
use_order_by: bool = True,
) -> None:
self.decorators = []
if repo:
Expand Down Expand Up @@ -100,6 +107,8 @@ def __init__(
),
]
)
if use_order_by:
self.decorators.append(order_by_option())
if showUri:
self.decorators.append(
click.option("--show-uri", is_flag=True, help="Show the dataset URI in results.")
Expand Down
1 change: 1 addition & 0 deletions python/lsst/daf/butler/script/_associate.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def associate(
where=where,
find_first=find_first,
limit=limit,
order_by=(),
show_uri=False,
repo=None,
)
Expand Down
62 changes: 36 additions & 26 deletions python/lsst/daf/butler/script/queryDatasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import annotations

import dataclasses
import logging
from collections import defaultdict
from collections.abc import Iterable, Iterator
Expand All @@ -46,21 +45,15 @@
_LOG = logging.getLogger(__name__)


@dataclasses.dataclass(frozen=True)
class _RefInfo:
datasetRef: DatasetRef
uri: str | None


class _Table:
"""Aggregates rows for a single dataset type, and creates an astropy table
with the aggregated data. Eliminates duplicate rows.
"""

datasetRefs: set[_RefInfo]
datasetRefs: dict[DatasetRef, str | None]

def __init__(self) -> None:
self.datasetRefs = set()
self.datasetRefs = {}

def add(self, datasetRef: DatasetRef, uri: ResourcePath | None = None) -> None:
"""Add a row of information to the table.
Expand All @@ -76,15 +69,18 @@ def add(self, datasetRef: DatasetRef, uri: ResourcePath | None = None) -> None:
The URI to show as a file location in the table, by default `None`.
"""
uri_str = str(uri) if uri else None
self.datasetRefs.add(_RefInfo(datasetRef, uri_str))
# Use a dict to retain ordering.
self.datasetRefs[datasetRef] = uri_str

def getAstropyTable(self, datasetTypeName: str) -> AstropyTable:
def getAstropyTable(self, datasetTypeName: str, sort: bool = True) -> AstropyTable:
"""Get the table as an astropy table.
Parameters
----------
datasetTypeName : `str`
The dataset type name to show in the ``type`` column of the table.
sort : `bool`, optional
If `True` the table will be sorted.
Returns
-------
Expand All @@ -96,11 +92,8 @@ def getAstropyTable(self, datasetTypeName: str) -> AstropyTable:
if not self.datasetRefs:
raise RuntimeError(f"No DatasetRefs were provided for dataset type {datasetTypeName}")

refInfo = next(iter(self.datasetRefs))
dimensions = [
refInfo.datasetRef.dataId.universe.dimensions[k]
for k in refInfo.datasetRef.dataId.dimensions.data_coordinate_keys
]
ref = next(iter(self.datasetRefs))
dimensions = [ref.dataId.universe.dimensions[k] for k in ref.dataId.dimensions.data_coordinate_keys]
columnNames = ["type", "run", "id", *[str(item) for item in dimensions]]

# Need to hint the column types for numbers since the per-row
Expand All @@ -111,26 +104,29 @@ def getAstropyTable(self, datasetTypeName: str) -> AstropyTable:
None,
None,
str,
*[typeMap.get(type(value)) for value in refInfo.datasetRef.dataId.full_values],
*[typeMap.get(type(value)) for value in ref.dataId.full_values],
]
if refInfo.uri:
if self.datasetRefs[ref]:
columnNames.append("URI")
columnTypes.append(None)

rows = []
for refInfo in self.datasetRefs:
for ref, uri in self.datasetRefs.items():
row = [
datasetTypeName,
refInfo.datasetRef.run,
str(refInfo.datasetRef.id),
*refInfo.datasetRef.dataId.full_values,
ref.run,
str(ref.id),
*ref.dataId.full_values,
]
if refInfo.uri:
row.append(refInfo.uri)
if uri:
row.append(uri)
rows.append(row)

dataset_table = AstropyTable(np.array(rows), names=columnNames, dtype=columnTypes)
return sortAstropyTable(dataset_table, dimensions, ["type", "run"])
if sort:
return sortAstropyTable(dataset_table, dimensions, ["type", "run"])
else:
return dataset_table


class QueryDatasets:
Expand Down Expand Up @@ -160,6 +156,11 @@ class QueryDatasets:
Limit the number of results to be returned. A value of 0 means
unlimited. A negative value is used to specify a cap where a warning
is issued if that cap is hit.
order_by : `tuple` of `str`
Dimensions to use for sorting results. If no ordering is given the
results of ``limit`` are undefined and default sorting of the resulting
datasets will be applied. It is an error if the requested ordering
is inconsistent with the dimensions of the dataset type being queried.
repo : `str` or `None`
URI to the location of the repo or URI to a config file describing the
repo and its location. One of `repo` and `butler` must be `None` and
Expand All @@ -177,6 +178,7 @@ def __init__(
find_first: bool,
show_uri: bool,
limit: int = 0,
order_by: tuple[str, ...] = (),
repo: str | None = None,
butler: Butler | None = None,
):
Expand All @@ -191,6 +193,7 @@ def __init__(
self._where = where
self._find_first = find_first
self._limit = limit
self._order_by = order_by

def getTables(self) -> list[AstropyTable]:
"""Get the datasets as a list of astropy tables.
Expand All @@ -212,7 +215,12 @@ def getTables(self) -> list[AstropyTable]:
for name, uri in uris.componentURIs.items():
tables[ref.datasetType.componentTypeName(name)].add(ref, uri)

return [table.getAstropyTable(datasetTypeName) for datasetTypeName, table in tables.items()]
# Sort if we haven't been told to enforce an order.
sort_table = not bool(self._order_by)
return [
table.getAstropyTable(datasetTypeName, sort=sort_table)
for datasetTypeName, table in tables.items()
]

# @profile
def getDatasets(self) -> Iterator[DatasetRef]:
Expand Down Expand Up @@ -247,6 +255,7 @@ def getDatasets(self) -> Iterator[DatasetRef]:
_LOG.info("The given dataset type, %s, is not known to this butler.", datasetTypes)
else:
_LOG.info("Processing %d dataset type%s", n_dataset_types, "" if n_dataset_types == 1 else "s")
_LOG.warning("Order by: %s", self._order_by)

# Accumulate over dataset types.
limit = self._limit
Expand All @@ -267,6 +276,7 @@ def getDatasets(self) -> Iterator[DatasetRef]:
collections=query_collections,
find_first=self._find_first,
with_dimension_records=True,
order_by=self._order_by,
**kwargs,
)
if not unlimited:
Expand Down
7 changes: 7 additions & 0 deletions python/lsst/daf/butler/script/retrieveArtifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def retrieveArtifacts(
where: str,
find_first: bool,
limit: int,
order_by: tuple[str, ...],
transfer: str,
preserve_path: bool,
clobber: bool,
Expand All @@ -75,6 +76,11 @@ def retrieveArtifacts(
Limit the number of results to be returned. A value of 0 means
unlimited. A negative value is used to specify a cap where a warning
is issued if that cap is hit.
order_by : `tuple` of `str`
Dimensions to use for sorting results. If no ordering is given the
results of ``limit`` are undefined and default sorting of the resulting
datasets will be applied. It is an error if the requested ordering
is inconsistent with the dimensions of the dataset type being queried.
transfer : `str`
Transfer mode to use when placing artifacts in the destination.
preserve_path : `bool`
Expand Down Expand Up @@ -102,6 +108,7 @@ def retrieveArtifacts(
where=where,
find_first=find_first,
limit=limit,
order_by=order_by,
show_uri=False,
)
refs = list(query.getDatasets())
Expand Down
7 changes: 7 additions & 0 deletions python/lsst/daf/butler/script/transferDatasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def transferDatasets(
where: str,
find_first: bool,
limit: int,
order_by: tuple[str, ...],
transfer: str,
register_dataset_types: bool,
transfer_dimensions: bool = True,
Expand All @@ -69,6 +70,11 @@ def transferDatasets(
Limit the number of results to be returned. A value of 0 means
unlimited. A negative value is used to specify a cap where a warning
is issued if that cap is hit.
order_by : `tuple` of `str`
Dimensions to use for sorting results. If no ordering is given the
results of ``limit`` are undefined and default sorting of the resulting
datasets will be applied. It is an error if the requested ordering
is inconsistent with the dimensions of the dataset type being queried.
transfer : `str`
Transfer mode to use when placing artifacts in the destination.
register_dataset_types : `bool`
Expand All @@ -91,6 +97,7 @@ def transferDatasets(
where=where,
find_first=find_first,
limit=limit,
order_by=order_by,
show_uri=False,
)
# Place results in a set to remove duplicates (which should not exist
Expand Down
4 changes: 4 additions & 0 deletions tests/test_cliCmdAssociate.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def test_defaults(self, mockAssociate):
collections=(),
where="",
find_first=False,
limit=0,
)

@patch("lsst.daf.butler.script.associate")
Expand All @@ -76,6 +77,8 @@ def test_values(self, mockAssociate):
"--where",
"'a=b'",
"--find-first",
"--limit",
"-5000",
],
)
self.assertEqual(result.exit_code, 0, clickResultMsg(result))
Expand All @@ -86,6 +89,7 @@ def test_values(self, mockAssociate):
collections=("myCollection", "otherCollection"),
where="'a=b'",
find_first=True,
limit=-5000,
)


Expand Down
6 changes: 4 additions & 2 deletions tests/test_cliCmdQueryDatasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@ class QueryDatasetsTest(unittest.TestCase, ButlerTestHelper):
storageClassFactory = StorageClassFactory()

@staticmethod
def _queryDatasets(repo, glob=(), collections=(), where="", find_first=False, show_uri=False):
return script.QueryDatasets(glob, collections, where, find_first, show_uri, repo=repo).getTables()
def _queryDatasets(repo, glob=(), collections=(), where="", find_first=False, show_uri=False, limit=0):
return script.QueryDatasets(
glob, collections, where=where, find_first=find_first, show_uri=show_uri, limit=limit, repo=repo
).getTables()

def setUp(self):
self.testdir = makeTestTempDir(TESTDIR)
Expand Down

0 comments on commit afba401

Please sign in to comment.