Skip to content

Commit

Permalink
Store TPCH results in separate table (#1506)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored May 27, 2024
1 parent 5f75db3 commit 4e3ec1a
Show file tree
Hide file tree
Showing 12 changed files with 734 additions and 194 deletions.
60 changes: 60 additions & 0 deletions alembic/versions/00d5844fd364_add_tpch_run_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Add tpch run table
Revision ID: 00d5844fd364
Revises: 25053f75e09f
Create Date: 2024-04-09 13:41:39.795757
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '00d5844fd364'
down_revision = '25053f75e09f'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('tpch_run',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('session_id', sa.String(), nullable=False),
sa.Column('name', sa.String(), nullable=False),
sa.Column('originalname', sa.String(), nullable=False),
sa.Column('path', sa.String(), nullable=True),
sa.Column('setup_outcome', sa.String(), nullable=True),
sa.Column('call_outcome', sa.String(), nullable=True),
sa.Column('teardown_outcome', sa.String(), nullable=True),
sa.Column('dask_version', sa.String(), nullable=True),
sa.Column('dask_expr_version', sa.String(), nullable=True),
sa.Column('distributed_version', sa.String(), nullable=True),
sa.Column('duckdb_version', sa.String(), nullable=True),
sa.Column('pyspark_version', sa.String(), nullable=True),
sa.Column('polars_version', sa.String(), nullable=True),
sa.Column('python_version', sa.String(), nullable=True),
sa.Column('platform', sa.String(), nullable=True),
sa.Column('ci_run_url', sa.String(), nullable=True),
sa.Column('start', sa.DateTime(), nullable=True),
sa.Column('end', sa.DateTime(), nullable=True),
sa.Column('duration', sa.Float(), nullable=True),
sa.Column('average_memory', sa.Float(), nullable=True),
sa.Column('peak_memory', sa.Float(), nullable=True),
sa.Column('cluster_name', sa.String(), nullable=True),
sa.Column('cluster_id', sa.Integer(), nullable=True),
sa.Column('cluster_details_url', sa.String(), nullable=True),
sa.Column('scale', sa.Integer(), nullable=False),
sa.Column('query', sa.Integer(), nullable=False),
sa.Column('local', sa.Boolean(), nullable=False),
sa.Column('compression', sa.String(), nullable=True),
sa.Column('partition_size', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('tpch_run')
# ### end Alembic commands ###
32 changes: 32 additions & 0 deletions alembic/versions/e11cd1aaed38_add_cluster_spec_to_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""Add cluster spec to db
Revision ID: e11cd1aaed38
Revises: 00d5844fd364
Create Date: 2024-04-15 10:32:18.323088
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'e11cd1aaed38'
down_revision = '00d5844fd364'
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('tpch_run', sa.Column('n_workers', sa.Integer(), nullable=True))
op.add_column('tpch_run', sa.Column('worker_vm_type', sa.String(), nullable=True))
op.add_column('tpch_run', sa.Column('cluster_disk_size', sa.Integer(), nullable=True))
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('tpch_run', 'cluster_disk_size')
op.drop_column('tpch_run', 'worker_vm_type')
op.drop_column('tpch_run', 'n_workers')
# ### end Alembic commands ###
58 changes: 57 additions & 1 deletion benchmark_schema.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from sqlalchemy import Column, DateTime, Float, Integer, String
from sqlalchemy import Boolean, Column, DateTime, Float, Integer, String
from sqlalchemy.orm import declarative_base

Base = declarative_base()
Expand Down Expand Up @@ -62,3 +62,59 @@ class TestRun(Base):
# Artifacts
performance_report_url = Column(String, nullable=True) # Not yet collected
cluster_dump_url = Column(String, nullable=True)


class TPCHRun(Base):
__tablename__ = "tpch_run"

# unique run ID
id = Column(Integer, primary_key=True)

# pytest data
session_id = Column(String, nullable=False)
name = Column(String, nullable=False)
originalname = Column(String, nullable=False)
path = Column(String, nullable=True)
setup_outcome = Column(String, nullable=True)
call_outcome = Column(String, nullable=True)
teardown_outcome = Column(String, nullable=True)

# Runtime data
dask_version = Column(String, nullable=True)
dask_expr_version = Column(String, nullable=True)
distributed_version = Column(String, nullable=True)
duckdb_version = Column(String, nullable=True)
pyspark_version = Column(String, nullable=True)
polars_version = Column(String, nullable=True)

python_version = Column(String, nullable=True)
platform = Column(String, nullable=True)

# CI runner data
ci_run_url = Column(String, nullable=True)

# Wall clock data
start = Column(DateTime, nullable=True)
end = Column(DateTime, nullable=True)
duration = Column(Float, nullable=True)

# Memory data
average_memory = Column(Float, nullable=True)
peak_memory = Column(Float, nullable=True)

# Cluster name/id/details_url
cluster_name = Column(String, nullable=True)
cluster_id = Column(Integer, nullable=True)
cluster_details_url = Column(String, nullable=True)

scale = Column(Integer, nullable=False)
query = Column(Integer, nullable=False)
local = Column(Boolean, nullable=False)

compression = Column(String, nullable=True)
partition_size = Column(String, nullable=True)
partition_size = Column(String, nullable=True)

n_workers = Column(Integer, nullable=True)
worker_vm_type = Column(String, nullable=True)
cluster_disk_size = Column(Integer, nullable=True)
10 changes: 7 additions & 3 deletions ci/scripts/combine-dbs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,28 @@ fi
DB_NAME=benchmark.tmp.db alembic upgrade head

# Merge in the individual job dbs into our working copy
for FILE in $(find benchmarks -name "*.db")
for FILE in $(find . -name "*.db")
do
# Skip the output DB if we see it
if [ ${FILE##*/} == $DB_NAME ]; then
echo "Skipping $FILE"
continue
fi
echo "Processing $FILE"
DB_NAME=$FILE alembic upgrade head
# Copy the individual table into the primary one. We make an intermediate
# temp table so that we can null out the primary keys and reset the
# autoincrementing
for tab in "tpch_run" "test_run"
do
sqlite3 "$FILE" <<EOF
attach "benchmark.tmp.db" as lead;
create temporary table tmp as select * from main.test_run;
create temporary table tmp as select * from main.$tab;
update tmp set id=NULL;
insert into lead.test_run select * from tmp;
insert into lead.$tab select * from tmp;
detach database lead;
EOF
done
done

mv benchmark.tmp.db "$DB_NAME"
55 changes: 25 additions & 30 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import time
import uuid
from functools import lru_cache
from pathlib import Path

import dask
import dask.array as da
Expand Down Expand Up @@ -73,12 +72,6 @@ def pytest_sessionfinish(session, exitstatus):
session.exitstatus = 0


def _is_child_dir(path: str | Path, parent: str | Path) -> bool:
_parent = Path(parent).absolute()
_path = Path(path).absolute()
return _parent in _path.parents or _parent == _path


dask.config.set(
{
"coiled.account": "dask-benchmarks",
Expand Down Expand Up @@ -121,7 +114,6 @@ def benchmark_db_engine(pytestconfig, tmp_path_factory):
------
The SQLAlchemy engine if the ``--benchmark`` option is set, None otherwise.
"""

if not pytestconfig.getoption("--benchmark"):
yield
else:
Expand Down Expand Up @@ -165,8 +157,26 @@ def benchmark_db_session(benchmark_db_engine):
yield session


@pytest.fixture()
def database_table_schema(request, testrun_uid):
return TestRun(
session_id=testrun_uid,
name=request.node.name,
originalname=request.node.originalname,
path=str(request.node.path.relative_to(TEST_DIR)),
dask_version=dask.__version__,
dask_expr_version=dask_expr.__version__,
distributed_version=distributed.__version__,
coiled_runtime_version=os.environ.get("AB_VERSION", "upstream"),
coiled_software_name=COILED_SOFTWARE_NAME,
python_version=".".join(map(str, sys.version_info)),
platform=sys.platform,
ci_run_url=WORKFLOW_URL,
)


@pytest.fixture(scope="function")
def test_run_benchmark(benchmark_db_session, request, testrun_uid):
def test_run_benchmark(benchmark_db_session, request, database_table_schema):
"""SQLAlchemy ORM object representing a given test run.
By including this fixture in a test (or another fixture that includes it)
Expand All @@ -182,33 +192,19 @@ def test_run_benchmark(benchmark_db_session, request, testrun_uid):
if not benchmark_db_session:
yield
else:
run = TestRun(
session_id=testrun_uid,
name=request.node.name,
originalname=request.node.originalname,
path=str(request.node.path.relative_to(TEST_DIR)),
dask_version=dask.__version__,
dask_expr_version=dask_expr.__version__,
distributed_version=distributed.__version__,
coiled_runtime_version=os.environ.get("AB_VERSION", "upstream"),
coiled_software_name=COILED_SOFTWARE_NAME,
python_version=".".join(map(str, sys.version_info)),
platform=sys.platform,
ci_run_url=WORKFLOW_URL,
)
yield run
yield database_table_schema

rep = getattr(request.node, "rep_setup", None)
if rep:
run.setup_outcome = rep.outcome
database_table_schema.setup_outcome = rep.outcome
rep = getattr(request.node, "rep_call", None)
if rep:
run.call_outcome = rep.outcome
database_table_schema.call_outcome = rep.outcome
rep = getattr(request.node, "rep_teardown", None)
if rep:
run.teardown_outcome = rep.outcome
database_table_schema.teardown_outcome = rep.outcome

benchmark_db_session.add(run)
benchmark_db_session.add(database_table_schema)
benchmark_db_session.commit()


Expand Down Expand Up @@ -407,7 +403,7 @@ def _get_cluster_info(cluster):
yield _get_cluster_info


@pytest.fixture(scope="function")
@pytest.fixture(scope="function", autouse=True)
def span(request):
with span_ctx(request.node.name):
yield
Expand All @@ -420,7 +416,6 @@ def benchmark_all(
benchmark_coiled_prometheus,
get_cluster_info,
benchmark_time,
span,
):
"""Benchmark all available metrics and extracts cluster information
Expand Down
Loading

0 comments on commit 4e3ec1a

Please sign in to comment.