Skip to content

Commit

Permalink
Synchronous client: Add support for pagination
Browse files Browse the repository at this point in the history
- Update models to add new fields in the response to support pagination
  - `next_uri`: the URI to use to fetch the next page of results if available
  - `next_offset`: the next offset to fetch the next page of results
    if available and paginating manually
- Update base API to support paginating through results and add methods
  to get results given a URL (this simplifies reading paginated results)
- On higher level functions, use pagination by default
  • Loading branch information
RichardKeo committed Feb 16, 2024
1 parent 5859507 commit 53aa1b1
Show file tree
Hide file tree
Showing 16 changed files with 325 additions and 42 deletions.
12 changes: 3 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ $(VENV)/bin/activate: requirements/dev.txt
$(PIP) install -r requirements/dev.txt


install:
make $(VENV)/bin/activate
install: $(VENV)/bin/activate

clean:
rm -rf __pycache__
Expand All @@ -24,17 +23,12 @@ lint:
types:
mypy dune_client/ --strict

check:
make fmt
make lint
make types
check: fmt lint types

test-unit:
python -m pytest tests/unit

test-e2e:
python -m pytest tests/e2e

test-all:
make test-unit
make test-e2e
test-all: test-unit test-e2e
28 changes: 22 additions & 6 deletions dune_client/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Framework built on Dune's API Documentation
https://duneanalytics.notion.site/API-Documentation-1b93d16e0fa941398e15047f643e003a
"""

from __future__ import annotations

import logging.config
Expand All @@ -15,6 +16,12 @@

from dune_client.util import get_package_version

# Headers used for pagination in CSV results
DUNE_CSV_NEXT_URI_HEADER = "x-dune-next-uri"
DUNE_CSV_NEXT_OFFSET_HEADER = "x-dune-next-offset"
# Default maximum number of rows to retrieve per batch of results
MAX_NUM_ROWS_PER_BATCH = 32_000


# pylint: disable=too-few-public-methods
class BaseDuneClient:
Expand Down Expand Up @@ -92,20 +99,29 @@ def _handle_response(self, response: Response) -> Any:
response.raise_for_status()
raise ValueError("Unreachable since previous line raises") from err

def _route_url(self, route: str) -> str:
return f"{self.base_url}{self.api_version}{route}"
def _route_url(self, route: Optional[str] = None, url: Optional[str] = None) -> str:
if route is not None:
final_url = f"{self.base_url}{self.api_version}{route}"
elif url is not None:
final_url = url
else:
assert route is not None or url is not None

return final_url

def _get(
self,
route: str,
route: Optional[str] = None,
params: Optional[Any] = None,
raw: bool = False,
url: Optional[str] = None,
) -> Any:
"""Generic interface for the GET method of a Dune API request"""
url = self._route_url(route)
self.logger.debug(f"GET received input url={url}")
final_url = self._route_url(route=route, url=url)
self.logger.debug(f"GET received input url={final_url}")

response = self.http.get(
url=url,
url=final_url,
headers=self.default_headers(),
timeout=self.request_timeout,
params=params,
Expand Down
79 changes: 72 additions & 7 deletions dune_client/api/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@
execution: https://dune.com/docs/api/api-reference/execute-queries/
get results: https://dune.com/docs/api/api-reference/get-results/
"""

from io import BytesIO
from typing import Optional
from typing import Any, Dict, Optional

from deprecated import deprecated

from dune_client.api.base import BaseRouter
from dune_client.api.base import (
BaseRouter,
DUNE_CSV_NEXT_URI_HEADER,
DUNE_CSV_NEXT_OFFSET_HEADER,
)
from dune_client.models import (
ExecutionResponse,
ExecutionStatusResponse,
Expand Down Expand Up @@ -66,26 +71,86 @@ def get_execution_status(self, job_id: str) -> ExecutionStatusResponse:
except KeyError as err:
raise DuneError(response_json, "ExecutionStatusResponse", err) from err

def get_execution_results(self, job_id: str) -> ResultsResponse:
def get_execution_results(
self,
job_id: str,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> ResultsResponse:
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
response_json = self._get(route=f"/execution/{job_id}/results")
params = {}
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset

route = f"/execution/{job_id}/results"
url = self._route_url(route)
return self._get_execution_results_by_url(url=url, params=params)

def _get_execution_results_by_url(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
) -> ResultsResponse:
"""
GET results from Dune API with a given URL. This is particularly useful for pagination.
"""
assert url.startswith(self.base_url)

response_json = self._get(url=url, params=params)
try:
return ResultsResponse.from_dict(response_json)
except KeyError as err:
raise DuneError(response_json, "ResultsResponse", err) from err

def get_execution_results_csv(self, job_id: str) -> ExecutionResultCSV:
def get_execution_results_csv(
self,
job_id: str,
limit: Optional[int] = None,
offset: Optional[int] = None,
) -> ExecutionResultCSV:
"""
GET results in CSV format from Dune API for `job_id` (aka `execution_id`)
this API only returns the raw data in CSV format, it is faster & lighterweight
use this method for large results where you want lower CPU and memory overhead
if you need metadata information use get_results() or get_status()
"""
params = {}
if limit is not None:
params["limit"] = limit
if offset is not None:
params["offset"] = offset

route = f"/execution/{job_id}/results/csv"
response = self._get(route=route, raw=True)
url = self._route_url(route)
return self._get_execution_results_csv_by_url(url=url, params=params)

def _get_execution_results_csv_by_url(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
) -> ExecutionResultCSV:
"""
GET results in CSV format from Dune API with a given URL. This is particularly
useful for pagination
this API only returns the raw data in CSV format, it is faster & lighterweight
use this method for large results where you want lower CPU and memory overhead
if you need metadata information use get_results() or get_status()
"""
assert url.startswith(self.base_url)

response = self._get(url=url, params=params, raw=True)
response.raise_for_status()
return ExecutionResultCSV(data=BytesIO(response.content))
next_uri = response.headers.get(DUNE_CSV_NEXT_URI_HEADER)
next_offset = response.headers.get(DUNE_CSV_NEXT_OFFSET_HEADER)
return ExecutionResultCSV(
data=BytesIO(response.content),
next_uri=next_uri,
next_offset=next_offset,
)

#######################
# Deprecated Functions:
Expand Down
Loading

0 comments on commit 53aa1b1

Please sign in to comment.