Skip to content

Commit

Permalink
Aggregate query results (re #35) (#339)
Browse files Browse the repository at this point in the history
  • Loading branch information
Allen Short authored and jezdez committed Jan 16, 2020
1 parent 708a0b4 commit febc003
Show file tree
Hide file tree
Showing 15 changed files with 420 additions and 9 deletions.
20 changes: 18 additions & 2 deletions client/app/components/queries/ScheduleDialog.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import PropTypes from "prop-types";
import Modal from "antd/lib/modal";
import DatePicker from "antd/lib/date-picker";
import TimePicker from "antd/lib/time-picker";
import InputNumber from "antd/lib/input-number";
import Select from "antd/lib/select";
import Radio from "antd/lib/radio";
import { capitalize, clone, isEqual, omitBy, isNil } from "lodash";
Expand Down Expand Up @@ -54,10 +55,12 @@ class ScheduleDialog extends React.Component {
schedule: RefreshScheduleType,
refreshOptions: PropTypes.arrayOf(PropTypes.number).isRequired,
dialog: DialogPropType.isRequired,
resultsetSize: PropTypes.number,
};

static defaultProps = {
schedule: RefreshScheduleDefault,
resultsetSize: 1,
};

state = this.getState();
Expand All @@ -75,6 +78,7 @@ class ScheduleDialog extends React.Component {
interval,
dayOfWeek: day ? WEEKDAYS_SHORT[WEEKDAYS_FULL.indexOf(day)] : null,
newSchedule,
resultsetSize: this.props.resultsetSize,
};
}

Expand Down Expand Up @@ -169,6 +173,10 @@ class ScheduleDialog extends React.Component {
this.setScheduleUntil(null, date);
};

setResultsetSize = resultsetSize => {
this.setState({ resultsetSize });
};

save() {
const { newSchedule } = this.state;
const hasChanged = () => {
Expand All @@ -180,9 +188,9 @@ class ScheduleDialog extends React.Component {
// save if changed
if (hasChanged()) {
if (newSchedule.interval) {
this.props.dialog.close(clone(newSchedule));
this.props.dialog.close([clone(newSchedule), this.state.resultsetSize]);
} else {
this.props.dialog.close(null);
this.props.dialog.close([null, this.state.resultsetSize]);
}
}
this.props.dialog.dismiss();
Expand All @@ -196,6 +204,7 @@ class ScheduleDialog extends React.Component {
hour,
seconds,
newSchedule: { until },
resultsetSize,
} = this.state;

return (
Expand Down Expand Up @@ -271,6 +280,13 @@ class ScheduleDialog extends React.Component {
</div>
</div>
) : null}
Number of query results to keep
<InputNumber
className="form-control"
min={1}
defaultValue={resultsetSize || 1}
onChange={this.setResultsetSize}
/>
</Modal>
);
}
Expand Down
5 changes: 4 additions & 1 deletion client/app/pages/queries/view.js
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ function QueryViewCtrl(
} else {
request = pick($scope.query, [
"schedule",
"schedule_resultset_size",
"query",
"id",
"description",
Expand Down Expand Up @@ -507,8 +508,10 @@ function QueryViewCtrl(
ScheduleDialog.showModal({
schedule: $scope.query.schedule,
refreshOptions: $scope.refreshOptions,
}).result.then(schedule => {
resultsetSize: $scope.query.schedule_resultset_size,
}).result.then(([schedule, resultsetSize]) => {
$scope.query.schedule = schedule;
$scope.query.schedule_resultset_size = resultsetSize;
$scope.saveQuery();
});
};
Expand Down
11 changes: 11 additions & 0 deletions client/app/services/query-result.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) {
queryId: "@queryId",
id: "@id",
});
const QueryResultSetResource = $resource("api/queries/:id/resultset", { id: "@id" });
const Job = $resource("api/jobs/:id", { id: "@id" });
const JobWithApiKey = $resource("api/queries/:queryId/jobs/:id", { queryId: "@queryId", id: "@id" });
const statuses = {
Expand Down Expand Up @@ -299,6 +300,16 @@ function QueryResultService($resource, $timeout, $q, QueryResultError, Auth) {
return queryResult;
}

static getResultSet(queryId) {
const queryResult = new QueryResult();

QueryResultSetResource.get({ id: queryId }, response => {
queryResult.update(response);
});

return queryResult;
}

loadLatestCachedResult(queryId, parameters) {
$resource("api/queries/:id/results", { id: "@queryId" }, { post: { method: "POST" } }).post(
{ queryId, parameters },
Expand Down
6 changes: 5 additions & 1 deletion client/app/services/query.js
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,11 @@ function QueryResource($resource, $http, $location, $q, currentUser, QueryResult
this.latest_query_data_id = null;
}

if (this.latest_query_data && maxAge !== 0) {
if (this.schedule_resultset_size > 1) {
if (!this.queryResult) {
this.queryResult = QueryResult.getResultSet(this.id);
}
} else if (this.latest_query_data && maxAge !== 0) {
if (!this.queryResult) {
this.queryResult = new QueryResult({
query_result: this.latest_query_data,
Expand Down
27 changes: 27 additions & 0 deletions migrations/versions/2ba47e9812b1_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""empty message
Revision ID: 2ba47e9812b1
Revises: 71477dadd6ef, 9d7678c47452
Create Date: 2018-07-25 16:09:54.769289
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "2ba47e9812b1"
down_revision = (
"71477dadd6ef",
"9d7678c47452",
)
branch_labels = None
depends_on = None


def upgrade():
pass


def downgrade():
pass
40 changes: 40 additions & 0 deletions migrations/versions/9d7678c47452_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""Incremental query results aggregation
Revision ID: 9d7678c47452
Revises: 15041b7085fe
Create Date: 2018-03-08 04:36:12.802199
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "9d7678c47452"
down_revision = "15041b7085fe"
branch_labels = None
depends_on = None


def upgrade():
op.create_table(
"query_resultsets",
sa.Column("query_id", sa.Integer(), nullable=False),
sa.Column("result_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(["query_id"], ["queries.id"],),
sa.ForeignKeyConstraint(["result_id"], ["query_results.id"],),
sa.PrimaryKeyConstraint("query_id", "result_id"),
)
op.add_column(
u"queries", sa.Column("schedule_resultset_size", sa.Integer(), nullable=True)
)


1


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column(u"queries", "schedule_resultset_size")
op.drop_table("query_resultsets")
# ### end Alembic commands ###
7 changes: 7 additions & 0 deletions redash/handlers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
QueryResultDropdownResource,
QueryDropdownsResource,
QueryResultListResource,
QueryResultSetResource,
QueryResultResource,
)
from redash.handlers.query_snippets import (
Expand Down Expand Up @@ -224,6 +225,12 @@ def json_representation(data, code, headers=None):
"/api/queries/<query_id>/regenerate_api_key",
endpoint="query_regenerate_api_key",
)
api.add_org_resource(
QueryResultSetResource,
"/api/queries/<query_id>/resultset",
endpoint="query_aggregate_results",
)

api.add_org_resource(
QueryVersionListResource,
"/api/queries/<query_id>/version",
Expand Down
3 changes: 3 additions & 0 deletions redash/handlers/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def post(self):
:<json string name:
:<json string description:
:<json string schedule: Schedule interval, in seconds, for repeated execution of this query
:<json number schedule_resultset_size: Number of result sets to keep (null to keep only one)
:<json object options: Query options
.. _query-response-label:
Expand Down Expand Up @@ -255,6 +256,8 @@ def post(self):
query_def["data_source"] = data_source
query_def["org"] = self.current_org
query_def["is_draft"] = True
if query_def.get("schedule_resultset_size") == 1:
query_def["schedule_resultset_size"] = None
query = models.Query.create(**query_def)
query.record_changes(changed_by=self.current_user)
models.db.session.add(query)
Expand Down
31 changes: 31 additions & 0 deletions redash/handlers/query_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,37 @@ def get(self, query_id, dropdown_query_id):
return dropdown_values(dropdown_query_id, self.current_org)


class QueryResultSetResource(BaseResource):
@require_permission("view_query")
def get(self, query_id=None, filetype="json"):
query = get_object_or_404(
models.Query.get_by_id_and_org, query_id, self.current_org
)
if not query.schedule_resultset_size:
abort(404, message="query does not keep multiple results")

# Synthesize a result set from the last N results.
total = len(query.query_results)
offset = max(total - query.schedule_resultset_size, 0)
results = [qr.to_dict() for qr in query.query_results[offset:]]
if not results:
aggregate_result = {}
else:
# Start a synthetic data set with the data from the first result...
aggregate_result = results[0].copy()
aggregate_result["data"] = {
"columns": results[0]["data"]["columns"],
"rows": [],
}
# .. then add each subsequent result set into it.
for r in results:
aggregate_result["data"]["rows"].extend(r["data"]["rows"])

data = json_dumps({"query_result": aggregate_result})
headers = {"Content-Type": "application/json"}
return make_response(data, 200, headers)


class QueryResultResource(BaseResource):
@staticmethod
def add_cors_headers(headers):
Expand Down
67 changes: 66 additions & 1 deletion redash/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,11 @@ def unused(cls, days=7):
age_threshold = datetime.datetime.now() - datetime.timedelta(days=days)
return (
cls.query.filter(
Query.id.is_(None), cls.retrieved_at < age_threshold
Query.id.is_(None),
cls.retrieved_at < age_threshold,
~QueryResultSet.query.filter(
QueryResultSet.result_id == QueryResult.id
).exists(),
).outerjoin(Query)
).options(load_only("id"))

Expand Down Expand Up @@ -450,6 +454,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
db.Integer, db.ForeignKey("query_results.id"), nullable=True
)
latest_query_data = db.relationship(QueryResult)
query_results = db.relationship("QueryResult", secondary="query_resultsets")
name = Column(db.String(255))
description = Column(db.String(4096), nullable=True)
query_text = Column("query", db.Text)
Expand All @@ -465,6 +470,7 @@ class Query(ChangeTrackingMixin, TimestampMixin, BelongsToOrgMixin, db.Model):
is_draft = Column(db.Boolean, default=True, index=True)
schedule = Column(MutableDict.as_mutable(PseudoJSON), nullable=True)
schedule_failures = Column(db.Integer, default=0)
schedule_resultset_size = Column(db.Integer, nullable=True)
visualizations = db.relationship("Visualization", cascade="all, delete-orphan")
options = Column(MutableDict.as_mutable(PseudoJSON), default={})
search_vector = Column(
Expand Down Expand Up @@ -682,6 +688,55 @@ def search(
# sort the result using the weight as defined in the search vector column
return all_queries.search(term, sort=True).limit(limit)

@classmethod
def delete_stale_resultsets(cls):
delete_count = 0
texts = [
c[0]
for c in db.session.query(Query.query_text)
.filter(Query.schedule_resultset_size != None)
.distinct()
]
for text in texts:
queries = Query.query.filter(
Query.query_text == text, Query.schedule_resultset_size != None
).order_by(Query.schedule_resultset_size.desc())
# Multiple queries with the same text may request multiple result sets
# be kept. We start with the one that keeps the most, and delete both
# the unneeded bridge rows and result sets.
first_query = queries.first()
if first_query is not None and first_query.schedule_resultset_size:
resultsets = QueryResultSet.query.filter(
QueryResultSet.query_rel == first_query
).order_by(QueryResultSet.result_id)
resultset_count = resultsets.count()
if resultset_count > first_query.schedule_resultset_size:
n_to_delete = resultset_count - first_query.schedule_resultset_size
r_ids = [r.result_id for r in resultsets][:n_to_delete]
QueryResultSet.query.filter(
QueryResultSet.result_id.in_(r_ids)
).delete(synchronize_session=False)
delete_count += QueryResult.query.filter(
QueryResult.id.in_(r_ids)
).delete(synchronize_session=False)
# By this point there are no stale result sets left.
# Delete unneeded bridge rows for the remaining queries.
for q in queries[1:]:
resultsets = (
db.session.query(QueryResultSet.result_id)
.filter(QueryResultSet.query_rel == q)
.order_by(QueryResultSet.result_id)
)
n_to_delete = resultsets.count() - q.schedule_resultset_size
if n_to_delete > 0:
stale_r = QueryResultSet.query.filter(
QueryResultSet.result_id.in_(
resultsets.limit(n_to_delete).subquery()
)
)
stale_r.delete(synchronize_session=False)
return delete_count

@classmethod
def search_by_user(cls, term, user, limit=None):
return cls.by_user(user).search(term, sort=True).limit(limit)
Expand Down Expand Up @@ -740,6 +795,8 @@ def update_latest_result(cls, query_result):
q.latest_query_data = query_result
# don't auto-update the updated_at timestamp
q.skip_updated_at = True
if q.schedule_resultset_size and q.schedule_resultset_size > 0:
q.query_results.append(query_result)
db.session.add(q)

query_ids = [q.id for q in queries]
Expand Down Expand Up @@ -828,6 +885,14 @@ def dashboard_api_keys(self):
return [api_key[0] for api_key in api_keys]


class QueryResultSet(db.Model):
query_id = Column(db.Integer, db.ForeignKey("queries.id"), primary_key=True)
query_rel = db.relationship(Query)
result_id = Column(db.Integer, db.ForeignKey("query_results.id"), primary_key=True)
result = db.relationship(QueryResult)
__tablename__ = "query_resultsets"


@listens_for(Query.query_text, "set")
def gen_query_hash(target, val, oldval, initiator):
target.query_hash = utils.gen_query_hash(val)
Expand Down
1 change: 1 addition & 0 deletions redash/serializers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def serialize_query(
"query": query.query_text,
"query_hash": query.query_hash,
"schedule": query.schedule,
"schedule_resultset_size": query.schedule_resultset_size,
"api_key": query.api_key,
"is_archived": query.is_archived,
"is_draft": query.is_draft,
Expand Down
Loading

0 comments on commit febc003

Please sign in to comment.