Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Part I: Deprecate & Rename Old Routes #64

Merged
merged 2 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 119 additions & 52 deletions dune_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
from typing import Any, Optional, Union

import requests
from deprecated import deprecated
from requests import Response, JSONDecodeError

from dune_client.base_client import BaseDuneClient
from dune_client.interface import DuneInterface
from dune_client.models import (
ExecutionResponse,
ExecutionResultCSV,
Expand All @@ -23,12 +23,11 @@
ResultsResponse,
ExecutionState,
)

from dune_client.query import QueryBase, DuneQuery
from dune_client.types import QueryParameter


class DuneClient(DuneInterface, BaseDuneClient):
class DuneClient(BaseDuneClient): # pylint: disable=too-many-public-methods
"""
An interface for Dune API with a few convenience methods
combining the use of endpoints (e.g. refresh)
Expand Down Expand Up @@ -93,41 +92,24 @@ def _patch(self, route: str, params: Any) -> Any:
)
return self._handle_response(response)

@deprecated(version="1.2.1", reason="Please use execute_query")
def execute(
self, query: QueryBase, performance: Optional[str] = None
) -> ExecutionResponse:
"""Post's to Dune API for execute `query`"""
params = query.request_format()
params["performance"] = performance or self.performance

self.logger.info(
f"executing {query.query_id} on {performance or self.performance} cluster"
)
response_json = self._post(
route=f"/query/{query.query_id}/execute",
params=params,
)
try:
return ExecutionResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ExecutionResponse", err) from err
return self.execute_query(query, performance)

@deprecated(version="1.2.1", reason="Please use get_execution_status")
def get_status(self, job_id: str) -> ExecutionStatusResponse:
"""GET status from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(route=f"/execution/{job_id}/status")
try:
return ExecutionStatusResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ExecutionStatusResponse", err) from err
return self.get_execution_status(job_id)

@deprecated(version="1.2.1", reason="Please use get_execution_results")
def get_result(self, job_id: str) -> ResultsResponse:
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(route=f"/execution/{job_id}/results")
try:
return ResultsResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ResultsResponse", err) from err
return self.get_execution_results(job_id)

@deprecated(version="1.2.1", reason="Please use get_execution_results_csv")
def get_result_csv(self, job_id: str) -> ExecutionResultCSV:
"""
GET results in CSV format from Dune API for `job_id` (aka `execution_id`)
Expand All @@ -136,12 +118,7 @@ def get_result_csv(self, job_id: str) -> ExecutionResultCSV:
use this method for large results where you want lower CPU and memory overhead
if you need metadata information use get_results() or get_status()
"""
route = f"/execution/{job_id}/results/csv"
url = self._route_url(f"/execution/{job_id}/results/csv")
self.logger.debug(f"GET CSV received input url={url}")
response = self._get(route=route, raw=True)
response.raise_for_status()
return ExecutionResultCSV(data=BytesIO(response.content))
return self.get_execution_results_csv(job_id)

def get_latest_result(self, query: Union[QueryBase, str, int]) -> ResultsResponse:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deprecate and replace w/ get_query_result() ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is different than get_query_result. Get latest, fetches the latest results buy query ID (as opposed to execution/job ID).

I suppose we could do as you suggest, but I personally find the two to be rather different.

cf:

https://dune.com/docs/api/api-reference/latest_results/

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I don't forget, I need to review this again, but we should have a get_query_result() that uses a Query and simply returns the latest available query result (using the API that exists for that).
This method has the clear difference that it doesnt use up any execution credits, which is important.

"""
Expand Down Expand Up @@ -193,20 +170,21 @@ def _refresh(
fetches and returns the results.
Sleeps `ping_frequency` seconds between each status request.
"""
job_id = self.execute(query=query, performance=performance).execution_id
status = self.get_status(job_id)
job_id = self.execute_query(query=query, performance=performance).execution_id
status = self.get_execution_status(job_id)
while status.state not in ExecutionState.terminal_states():
self.logger.info(
f"waiting for query execution {job_id} to complete: {status}"
)
time.sleep(ping_frequency)
status = self.get_status(job_id)
status = self.get_execution_status(job_id)
if status.state == ExecutionState.FAILED:
self.logger.error(status)
raise QueryFailed(f"{status}. Perhaps your query took too long to run!")

return job_id

@deprecated(version="1.2.1", reason="Please use run_query")
def refresh(
self,
query: QueryBase,
Expand All @@ -218,11 +196,9 @@ def refresh(
fetches and returns the results.
Sleeps `ping_frequency` seconds between each status request.
"""
job_id = self._refresh(
query, ping_frequency=ping_frequency, performance=performance
)
return self.get_result(job_id)
return self.run_query(query, ping_frequency, performance)

@deprecated(version="1.2.1", reason="Please use run_query_csv")
def refresh_csv(
self,
query: QueryBase,
Expand All @@ -234,11 +210,9 @@ def refresh_csv(
fetches and the results in CSV format
(use it load the data directly in pandas.from_csv() or similar frameworks)
"""
job_id = self._refresh(
query, ping_frequency=ping_frequency, performance=performance
)
return self.get_result_csv(job_id)
return self.run_query_csv(query, ping_frequency, performance)

@deprecated(version="1.2.1", reason="Please use run_query_dataframe")
def refresh_into_dataframe(
self, query: QueryBase, performance: Optional[str] = None
) -> Any:
Expand All @@ -248,14 +222,7 @@ def refresh_into_dataframe(

This is a convenience method that uses refresh_csv underneath
"""
try:
import pandas # type: ignore # pylint: disable=import-outside-toplevel
except ImportError as exc:
raise ImportError(
"dependency failure, pandas is required but missing"
) from exc
data = self.refresh_csv(query, performance=performance).data
return pandas.read_csv(data)
return self.run_query_dataframe(query, performance)

# CRUD Operations: https://dune.com/docs/api/api-reference/edit-queries/
def create_query(
Expand Down Expand Up @@ -397,3 +364,103 @@ def upload_csv(self, table_name: str, data: str, description: str = "") -> bool:
return bool(response_json["success"])
except KeyError as err:
raise DuneError(response_json, "upload_csv response", err) from err

def execute_query(
self, query: QueryBase, performance: Optional[str] = None
) -> ExecutionResponse:
"""Post's to Dune API for execute `query`"""
params = query.request_format()
params["performance"] = performance or self.performance

self.logger.info(
f"executing {query.query_id} on {performance or self.performance} cluster"
)
response_json = self._post(
route=f"/query/{query.query_id}/execute",
params=params,
)
try:
return ExecutionResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ExecutionResponse", err) from err

def get_execution_status(self, job_id: str) -> ExecutionStatusResponse:
"""GET status from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(route=f"/execution/{job_id}/status")
try:
return ExecutionStatusResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ExecutionStatusResponse", err) from err

def get_execution_results(self, job_id: str) -> ResultsResponse:
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(route=f"/execution/{job_id}/results")
try:
return ResultsResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ResultsResponse", err) from err

def get_execution_results_csv(self, job_id: str) -> ExecutionResultCSV:
"""
GET results in CSV format from Dune API for `job_id` (aka `execution_id`)

this API only returns the raw data in CSV format, it is faster & lighterweight
use this method for large results where you want lower CPU and memory overhead
if you need metadata information use get_results() or get_status()
"""
route = f"/execution/{job_id}/results/csv"
url = self._route_url(f"/execution/{job_id}/results/csv")
self.logger.debug(f"GET CSV received input url={url}")
response = self._get(route=route, raw=True)
response.raise_for_status()
return ExecutionResultCSV(data=BytesIO(response.content))

def run_query(
self,
query: QueryBase,
ping_frequency: int = 5,
performance: Optional[str] = None,
) -> ResultsResponse:
"""
Executes a Dune `query`, waits until execution completes,
fetches and returns the results.
Sleeps `ping_frequency` seconds between each status request.
"""
job_id = self._refresh(
query, ping_frequency=ping_frequency, performance=performance
)
return self.get_execution_results(job_id)

def run_query_csv(
self,
query: QueryBase,
ping_frequency: int = 5,
performance: Optional[str] = None,
) -> ExecutionResultCSV:
"""
Executes a Dune query, waits till execution completes,
fetches and the results in CSV format
(use it load the data directly in pandas.from_csv() or similar frameworks)
"""
job_id = self._refresh(
query, ping_frequency=ping_frequency, performance=performance
)
return self.get_execution_results_csv(job_id)

def run_query_dataframe(
self, query: QueryBase, performance: Optional[str] = None
) -> Any:
"""
Execute a Dune Query, waits till execution completes,
fetched and returns the result as a Pandas DataFrame

This is a convenience method that uses refresh_csv underneath
"""
try:
import pandas # type: ignore # pylint: disable=import-outside-toplevel
except ImportError as exc:
raise ImportError(
"dependency failure, pandas is required but missing"
) from exc
data = self.run_query_csv(query, performance=performance).data
return pandas.read_csv(data)
2 changes: 2 additions & 0 deletions requirements/prod.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ types-requests>=2.31.0.2
python-dateutil>=2.8.2
requests>=2.31.0
ndjson>=0.3.1
Deprecated>=1.2.14
types-Deprecated==1.2.9.3
44 changes: 22 additions & 22 deletions tests/e2e/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,28 @@ def test_from_env_constructor(self):
except KeyError:
self.fail("DuneClient.from_env raised unexpectedly!")

def test_get_status(self):
def test_get_execution_status(self):
query = QueryBase(name="No Name", query_id=1276442, params=[])
dune = DuneClient(self.valid_api_key)
job_id = dune.execute(query).execution_id
status = dune.get_status(job_id)
job_id = dune.execute_query(query).execution_id
status = dune.get_execution_status(job_id)
self.assertTrue(
status.state in [ExecutionState.EXECUTING, ExecutionState.PENDING]
)

def test_refresh(self):
def test_run_query(self):
dune = DuneClient(self.valid_api_key)
results = dune.refresh(self.query).get_rows()
results = dune.run_query(self.query).get_rows()
self.assertGreater(len(results), 0)

def test_refresh_performance_large(self):
def test_run_query_performance_large(self):
dune = DuneClient(self.valid_api_key)
results = dune.refresh(self.query, performance="large").get_rows()
results = dune.run_query(self.query, performance="large").get_rows()
self.assertGreater(len(results), 0)

def test_refresh_into_dataframe(self):
def test_run_query_dataframe(self):
dune = DuneClient(self.valid_api_key)
pd = dune.refresh_into_dataframe(self.query)
pd = dune.run_query_dataframe(self.query)
self.assertGreater(len(pd), 0)

def test_parameters_recognized(self):
Expand All @@ -75,7 +75,7 @@ def test_parameters_recognized(self):
self.assertEqual(query.parameters(), new_params)

dune = DuneClient(self.valid_api_key)
results = dune.refresh(query)
results = dune.run_query(query)
self.assertEqual(
results.get_rows(),
[
Expand All @@ -90,14 +90,14 @@ def test_parameters_recognized(self):

def test_endpoints(self):
dune = DuneClient(self.valid_api_key)
execution_response = dune.execute(self.query)
execution_response = dune.execute_query(self.query)
self.assertIsInstance(execution_response, ExecutionResponse)
job_id = execution_response.execution_id
status = dune.get_status(job_id)
status = dune.get_execution_status(job_id)
self.assertIsInstance(status, ExecutionStatusResponse)
while dune.get_status(job_id).state != ExecutionState.COMPLETED:
while dune.get_execution_status(job_id).state != ExecutionState.COMPLETED:
time.sleep(1)
results = dune.get_result(job_id).result.rows
results = dune.get_execution_results(job_id).result.rows
self.assertGreater(len(results), 0)

def test_cancel_execution(self):
Expand All @@ -106,31 +106,31 @@ def test_cancel_execution(self):
name="Long Running Query",
query_id=1229120,
)
execution_response = dune.execute(query)
execution_response = dune.execute_query(query)
job_id = execution_response.execution_id
# POST Cancellation
success = dune.cancel_execution(job_id)
self.assertTrue(success)

results = dune.get_result(job_id)
results = dune.get_execution_results(job_id)
self.assertEqual(results.state, ExecutionState.CANCELLED)

def test_invalid_api_key_error(self):
dune = DuneClient(api_key="Invalid Key")
with self.assertRaises(DuneError) as err:
dune.execute(self.query)
dune.execute_query(self.query)
self.assertEqual(
str(err.exception),
"Can't build ExecutionResponse from {'error': 'invalid API Key'}",
)
with self.assertRaises(DuneError) as err:
dune.get_status("wonky job_id")
dune.get_execution_status("wonky job_id")
self.assertEqual(
str(err.exception),
"Can't build ExecutionStatusResponse from {'error': 'invalid API Key'}",
)
with self.assertRaises(DuneError) as err:
dune.get_result("wonky job_id")
dune.get_execution_results("wonky job_id")
self.assertEqual(
str(err.exception),
"Can't build ResultsResponse from {'error': 'invalid API Key'}",
Expand All @@ -142,7 +142,7 @@ def test_query_not_found_error(self):
query.query_id = 99999999 # Invalid Query Id.

with self.assertRaises(DuneError) as err:
dune.execute(query)
dune.execute_query(query)
self.assertEqual(
str(err.exception),
"Can't build ExecutionResponse from {'error': 'Query not found'}",
Expand All @@ -155,7 +155,7 @@ def test_internal_error(self):
query.query_id = 9999999999999

with self.assertRaises(DuneError) as err:
dune.execute(query)
dune.execute_query(query)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, looking at the code and having refreshed my memory:

question for you:

  • if we have a QueryBase or Query object, shouldn't we have the methods on that class:
  • query.execute()
  • query.result()
  • etc.. ?

with functions like dune.execute_query() it feels more idiomatic this way:

res = dune.execute_query(query_id=1234)

instead of:

res = dune.execute_query(Query(1234))

for me, the two idiomatic options would be:

  • dune.execute_query(1234) # or query_id=1234
  • dune.Query(1234).execute()

Right now, I see bits of OO in here, but inconsistently:

  • Executions are simply job_id
  • Queries are objects, with a query_id inside (it should be Query.id, right?)
    • but we don't have methods on Queries..
    • if we did, it would add structure to expand the library for varying types of APIs and do a cleaner split between concepts.

this is just food for thought, the goal here is to improve the code for readability and dev experience in a simple, pragmatic way.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I definitely agree some things like this could be approached differently. In all of the ways suggested above. These things have been on my mind for quite some time. I believe the reason we went for query_id is that some of these low level methods are never actually used by the end user and none of the additional query attributes are used. Most consumers of the client are usually calling refresh.

Maybe it can be made compatible with multiple approaches (i.e. introduce some method overrides where users can pass Query object or query_id). I would have to think a bit further if we can extend the functionality to the Query object itself in a backwards compatible way, but I would be wiling to give it a try.

self.assertEqual(
str(err.exception),
"Can't build ExecutionResponse from {'error': 'An internal error occured'}",
Expand All @@ -164,7 +164,7 @@ def test_internal_error(self):
def test_invalid_job_id_error(self):
dune = DuneClient(self.valid_api_key)
with self.assertRaises(DuneError) as err:
dune.get_status("Wonky Job ID")
dune.get_execution_status("Wonky Job ID")
self.assertEqual(
str(err.exception),
"Can't build ExecutionStatusResponse from "
Expand Down
Loading