From 53aa1b1d3fa43978fb9b34825380f25fff48884f Mon Sep 17 00:00:00 2001 From: Richard Keo Date: Fri, 16 Feb 2024 17:07:57 +0000 Subject: [PATCH] Synchronous client: Add support for pagination - Update models to add new fields in the response to support pagination - `next_uri`: the URI to use to fetch the next page of results if available - `next_offset`: the next offset to fetch the next page of results if available and paginating manually - Update base API to support paginating through results and add methods to get results given a URL (this simplifies reading paginated results) - On higher level functions, use pagination by default --- Makefile | 12 +--- dune_client/api/base.py | 28 ++++++++-- dune_client/api/execution.py | 79 +++++++++++++++++++++++--- dune_client/api/extensions.py | 101 ++++++++++++++++++++++++++++++---- dune_client/api/query.py | 3 +- dune_client/client.py | 1 + dune_client/client_async.py | 1 + dune_client/file/base.py | 1 + dune_client/file/interface.py | 1 + dune_client/interface.py | 1 + dune_client/models.py | 77 +++++++++++++++++++++++++- dune_client/query.py | 3 +- dune_client/types.py | 3 +- dune_client/util.py | 1 + tests/e2e/test_client.py | 49 ++++++++++++++++- tests/unit/test_models.py | 6 ++ 16 files changed, 325 insertions(+), 42 deletions(-) diff --git a/Makefile b/Makefile index ad2ede4..1679f9c 100644 --- a/Makefile +++ b/Makefile @@ -9,8 +9,7 @@ $(VENV)/bin/activate: requirements/dev.txt $(PIP) install -r requirements/dev.txt -install: - make $(VENV)/bin/activate +install: $(VENV)/bin/activate clean: rm -rf __pycache__ @@ -24,10 +23,7 @@ lint: types: mypy dune_client/ --strict -check: - make fmt - make lint - make types +check: fmt lint types test-unit: python -m pytest tests/unit @@ -35,6 +31,4 @@ test-unit: test-e2e: python -m pytest tests/e2e -test-all: - make test-unit - make test-e2e +test-all: test-unit test-e2e diff --git a/dune_client/api/base.py b/dune_client/api/base.py index c1371b9..9f4d18a 100644 --- a/dune_client/api/base.py +++ b/dune_client/api/base.py @@ -3,6 +3,7 @@ Framework built on Dune's API Documentation https://duneanalytics.notion.site/API-Documentation-1b93d16e0fa941398e15047f643e003a """ + from __future__ import annotations import logging.config @@ -15,6 +16,12 @@ from dune_client.util import get_package_version +# Headers used for pagination in CSV results +DUNE_CSV_NEXT_URI_HEADER = "x-dune-next-uri" +DUNE_CSV_NEXT_OFFSET_HEADER = "x-dune-next-offset" +# Default maximum number of rows to retrieve per batch of results +MAX_NUM_ROWS_PER_BATCH = 32_000 + # pylint: disable=too-few-public-methods class BaseDuneClient: @@ -92,20 +99,29 @@ def _handle_response(self, response: Response) -> Any: response.raise_for_status() raise ValueError("Unreachable since previous line raises") from err - def _route_url(self, route: str) -> str: - return f"{self.base_url}{self.api_version}{route}" + def _route_url(self, route: Optional[str] = None, url: Optional[str] = None) -> str: + if route is not None: + final_url = f"{self.base_url}{self.api_version}{route}" + elif url is not None: + final_url = url + else: + assert route is not None or url is not None + + return final_url def _get( self, - route: str, + route: Optional[str] = None, params: Optional[Any] = None, raw: bool = False, + url: Optional[str] = None, ) -> Any: """Generic interface for the GET method of a Dune API request""" - url = self._route_url(route) - self.logger.debug(f"GET received input url={url}") + final_url = self._route_url(route=route, url=url) + self.logger.debug(f"GET received input url={final_url}") + response = self.http.get( - url=url, + url=final_url, headers=self.default_headers(), timeout=self.request_timeout, params=params, diff --git a/dune_client/api/execution.py b/dune_client/api/execution.py index e06924e..69706c6 100644 --- a/dune_client/api/execution.py +++ b/dune_client/api/execution.py @@ -5,12 +5,17 @@ execution: https://dune.com/docs/api/api-reference/execute-queries/ get results: https://dune.com/docs/api/api-reference/get-results/ """ + from io import BytesIO -from typing import Optional +from typing import Any, Dict, Optional from deprecated import deprecated -from dune_client.api.base import BaseRouter +from dune_client.api.base import ( + BaseRouter, + DUNE_CSV_NEXT_URI_HEADER, + DUNE_CSV_NEXT_OFFSET_HEADER, +) from dune_client.models import ( ExecutionResponse, ExecutionStatusResponse, @@ -66,15 +71,45 @@ def get_execution_status(self, job_id: str) -> ExecutionStatusResponse: except KeyError as err: raise DuneError(response_json, "ExecutionStatusResponse", err) from err - def get_execution_results(self, job_id: str) -> ResultsResponse: + def get_execution_results( + self, + job_id: str, + limit: Optional[int] = None, + offset: Optional[int] = None, + ) -> ResultsResponse: """GET results from Dune API for `job_id` (aka `execution_id`)""" - response_json = self._get(route=f"/execution/{job_id}/results") + params = {} + if limit is not None: + params["limit"] = limit + if offset is not None: + params["offset"] = offset + + route = f"/execution/{job_id}/results" + url = self._route_url(route) + return self._get_execution_results_by_url(url=url, params=params) + + def _get_execution_results_by_url( + self, + url: str, + params: Optional[Dict[str, Any]] = None, + ) -> ResultsResponse: + """ + GET results from Dune API with a given URL. This is particularly useful for pagination. + """ + assert url.startswith(self.base_url) + + response_json = self._get(url=url, params=params) try: return ResultsResponse.from_dict(response_json) except KeyError as err: raise DuneError(response_json, "ResultsResponse", err) from err - def get_execution_results_csv(self, job_id: str) -> ExecutionResultCSV: + def get_execution_results_csv( + self, + job_id: str, + limit: Optional[int] = None, + offset: Optional[int] = None, + ) -> ExecutionResultCSV: """ GET results in CSV format from Dune API for `job_id` (aka `execution_id`) @@ -82,10 +117,40 @@ def get_execution_results_csv(self, job_id: str) -> ExecutionResultCSV: use this method for large results where you want lower CPU and memory overhead if you need metadata information use get_results() or get_status() """ + params = {} + if limit is not None: + params["limit"] = limit + if offset is not None: + params["offset"] = offset + route = f"/execution/{job_id}/results/csv" - response = self._get(route=route, raw=True) + url = self._route_url(route) + return self._get_execution_results_csv_by_url(url=url, params=params) + + def _get_execution_results_csv_by_url( + self, + url: str, + params: Optional[Dict[str, Any]] = None, + ) -> ExecutionResultCSV: + """ + GET results in CSV format from Dune API with a given URL. This is particularly + useful for pagination + + this API only returns the raw data in CSV format, it is faster & lighterweight + use this method for large results where you want lower CPU and memory overhead + if you need metadata information use get_results() or get_status() + """ + assert url.startswith(self.base_url) + + response = self._get(url=url, params=params, raw=True) response.raise_for_status() - return ExecutionResultCSV(data=BytesIO(response.content)) + next_uri = response.headers.get(DUNE_CSV_NEXT_URI_HEADER) + next_offset = response.headers.get(DUNE_CSV_NEXT_OFFSET_HEADER) + return ExecutionResultCSV( + data=BytesIO(response.content), + next_uri=next_uri, + next_offset=next_offset, + ) ####################### # Deprecated Functions: diff --git a/dune_client/api/extensions.py b/dune_client/api/extensions.py index d68637a..0eedbfc 100644 --- a/dune_client/api/extensions.py +++ b/dune_client/api/extensions.py @@ -1,6 +1,7 @@ """ Extended functionality for the ExecutionAPI """ + from __future__ import annotations import logging @@ -11,6 +12,11 @@ from deprecated import deprecated +from dune_client.api.base import ( + DUNE_CSV_NEXT_URI_HEADER, + DUNE_CSV_NEXT_OFFSET_HEADER, + MAX_NUM_ROWS_PER_BATCH, +) from dune_client.api.execution import ExecutionAPI from dune_client.api.query import QueryAPI from dune_client.models import ( @@ -41,6 +47,7 @@ def run_query( query: QueryBase, ping_frequency: int = POLL_FREQUENCY_SECONDS, performance: Optional[str] = None, + batch_size: int = MAX_NUM_ROWS_PER_BATCH, ) -> ResultsResponse: """ Executes a Dune `query`, waits until execution completes, @@ -48,13 +55,16 @@ def run_query( Sleeps `ping_frequency` seconds between each status request. """ job_id = self._refresh(query, ping_frequency, performance) - return self.get_execution_results(job_id) + return self._fetch_entire_result( + self.get_execution_results(job_id, limit=batch_size), + ) def run_query_csv( self, query: QueryBase, ping_frequency: int = POLL_FREQUENCY_SECONDS, performance: Optional[str] = None, + batch_size: int = MAX_NUM_ROWS_PER_BATCH, ) -> ExecutionResultCSV: """ Executes a Dune query, waits till execution completes, @@ -62,13 +72,16 @@ def run_query_csv( (use it load the data directly in pandas.from_csv() or similar frameworks) """ job_id = self._refresh(query, ping_frequency, performance) - return self.get_execution_results_csv(job_id) + return self._fetch_entire_result_csv( + self.get_execution_results_csv(job_id, limit=batch_size), + ) def run_query_dataframe( self, query: QueryBase, ping_frequency: int = POLL_FREQUENCY_SECONDS, performance: Optional[str] = None, + batch_size: int = MAX_NUM_ROWS_PER_BATCH, ) -> Any: """ Execute a Dune Query, waits till execution completes, @@ -82,13 +95,16 @@ def run_query_dataframe( raise ImportError( "dependency failure, pandas is required but missing" ) from exc - data = self.run_query_csv(query, ping_frequency, performance).data + data = self.run_query_csv( + query, ping_frequency, performance, batch_size=batch_size + ).data return pandas.read_csv(data) def get_latest_result( self, query: Union[QueryBase, str, int], max_age_hours: int = THREE_MONTHS_IN_HOURS, + batch_size: int = MAX_NUM_ROWS_PER_BATCH, ) -> ResultsResponse: """ GET the latest results for a query_id without re-executing the query @@ -99,21 +115,33 @@ def get_latest_result( https://dune.com/docs/api/api-reference/get-results/latest-results """ params, query_id = parse_query_object_or_id(query) + + # Only fetch the metadata first to determine if the result is fresh enough + if params is None: + params = {} + params["limit"] = 0 + response_json = self._get( route=f"/query/{query_id}/results", params=params, ) try: - results = ResultsResponse.from_dict(response_json) - last_run = results.times.execution_ended_at + metadata = ResultsResponse.from_dict(response_json) + last_run = metadata.times.execution_ended_at + if last_run and age_in_hours(last_run) > max_age_hours: - # Query older than specified max age + # Query older than specified max age, we need to refresh the results logging.info( f"results (from {last_run}) older than {max_age_hours} hours, re-running query" ) results = self.run_query( query if isinstance(query, QueryBase) else QueryBase(query_id) ) + else: + # The results are fresh enough, retrieve the entire result + results = self._fetch_entire_result( + self.get_execution_results(metadata.execution_id, limit=batch_size), + ) return results except KeyError as err: raise DuneError(response_json, "ResultsResponse", err) from err @@ -121,6 +149,7 @@ def get_latest_result( def get_latest_result_dataframe( self, query: Union[QueryBase, str, int], + batch_size: int = MAX_NUM_ROWS_PER_BATCH, ) -> Any: """ GET the latest results for a query_id without re-executing the query @@ -136,20 +165,38 @@ def get_latest_result_dataframe( "dependency failure, pandas is required but missing" ) from exc - data = self.download_csv(query).data + data = self.download_csv(query, batch_size=batch_size).data return pandas.read_csv(data) - def download_csv(self, query: Union[QueryBase, str, int]) -> ExecutionResultCSV: + def download_csv( + self, + query: Union[QueryBase, str, int], + batch_size: int = MAX_NUM_ROWS_PER_BATCH, + ) -> ExecutionResultCSV: """ Almost like an alias for `get_latest_result` but for the csv endpoint. https://dune.com/docs/api/api-reference/get-results/latest-results """ params, query_id = parse_query_object_or_id(query) + + if params is None: + params = {} + params["limit"] = batch_size + response = self._get( route=f"/query/{query_id}/results/csv", params=params, raw=True ) response.raise_for_status() - return ExecutionResultCSV(data=BytesIO(response.content)) + + next_uri = response.headers.get(DUNE_CSV_NEXT_URI_HEADER) + next_offset = response.headers.get(DUNE_CSV_NEXT_OFFSET_HEADER) + return self._fetch_entire_result_csv( + ExecutionResultCSV( + data=BytesIO(response.content), + next_uri=next_uri, + next_offset=next_offset, + ), + ) ############################ # Plus Subscription Features @@ -167,7 +214,7 @@ def upload_csv( - File has to be < 200 MB - Column names in the table can't start with a special character or digits. - - Private uploads require a Premium subscription. + - Private uploads require a Plus subscription. Below are the specifics of how to work with the API. """ @@ -186,7 +233,7 @@ def upload_csv( raise DuneError(response_json, "UploadCsvResponse", err) from err ############################################################################################## - # Premium Features: these features use APIs that are only available on paid subscription plans + # Plus Features: these features use APIs that are only available on paid subscription plans ############################################################################################## def run_sql( @@ -203,7 +250,7 @@ def run_sql( Allows user to provide execute raw_sql via the CRUD interface - create, run, get results with optional archive/delete. - Query is by default made private and archived after execution. - Requires premium subscription! + Requires Plus subscription! """ query = self.create_query(name, query_sql, params, is_private) results = self.run_query( @@ -286,3 +333,33 @@ def _refresh( raise QueryFailed(f"Error data: {status.error}") return job_id + + def _fetch_entire_result( + self, + results: ResultsResponse, + ) -> ResultsResponse: + """ + Retrieve the entire results using the paginated API + """ + next_uri = results.next_uri + while next_uri is not None: + batch = self._get_execution_results_by_url(url=next_uri) + results += batch + next_uri = batch.next_uri + + return results + + def _fetch_entire_result_csv( + self, + results: ExecutionResultCSV, + ) -> ExecutionResultCSV: + """ + Retrieve the entire results in CSV format using the paginated API + """ + next_uri = results.next_uri + while next_uri is not None: + batch = self._get_execution_results_csv_by_url(url=next_uri) + results += batch + next_uri = batch.next_uri + + return results diff --git a/dune_client/api/query.py b/dune_client/api/query.py index c70c01f..14bdf61 100644 --- a/dune_client/api/query.py +++ b/dune_client/api/query.py @@ -4,6 +4,7 @@ Enables more flexible integration of Dune API into your workflow and freeing you from UI-exclusive query editing. """ + from __future__ import annotations from typing import Optional, Any @@ -15,7 +16,7 @@ class QueryAPI(BaseRouter): """ - Implementation of Query API (aka CRUD) Operations - premium subscription only + Implementation of Query API (aka CRUD) Operations - Plus subscription only https://dune.com/docs/api/api-reference/edit-queries/ """ diff --git a/dune_client/client.py b/dune_client/client.py index 3e10665..c9d7874 100644 --- a/dune_client/client.py +++ b/dune_client/client.py @@ -3,6 +3,7 @@ Framework built on Dune's API Documentation https://duneanalytics.notion.site/API-Documentation-1b93d16e0fa941398e15047f643e003a """ + from dune_client.api.extensions import ExtendedAPI diff --git a/dune_client/client_async.py b/dune_client/client_async.py index 076a6b5..b428b19 100644 --- a/dune_client/client_async.py +++ b/dune_client/client_async.py @@ -3,6 +3,7 @@ Framework built on Dune's API Documentation https://duneanalytics.notion.site/API-Documentation-1b93d16e0fa941398e15047f643e003a """ + from __future__ import annotations import asyncio diff --git a/dune_client/file/base.py b/dune_client/file/base.py index d767d9d..dcc012a 100644 --- a/dune_client/file/base.py +++ b/dune_client/file/base.py @@ -1,4 +1,5 @@ """File Reader and Writer for DuneRecords""" + from __future__ import annotations import csv diff --git a/dune_client/file/interface.py b/dune_client/file/interface.py index 1d65896..4607572 100644 --- a/dune_client/file/interface.py +++ b/dune_client/file/interface.py @@ -1,4 +1,5 @@ """File Reader and Writer for DuneRecords""" + from __future__ import annotations import logging diff --git a/dune_client/interface.py b/dune_client/interface.py index 0fd5190..2540fdf 100644 --- a/dune_client/interface.py +++ b/dune_client/interface.py @@ -1,6 +1,7 @@ """ Abstract class for a basic Dune Interface with refresh method used by Query Runner. """ + import abc from typing import Any diff --git a/dune_client/models.py b/dune_client/models.py index bcab5a6..c19a174 100644 --- a/dune_client/models.py +++ b/dune_client/models.py @@ -1,6 +1,7 @@ """ Dataclasses encoding response data from Dune API. """ + from __future__ import annotations import logging.config @@ -8,6 +9,7 @@ from datetime import datetime from enum import Enum from io import BytesIO +from os import SEEK_END from typing import Optional, Any, Union, List, Dict from dateutil.parser import parse @@ -48,13 +50,14 @@ class ExecutionState(Enum): PENDING = "QUERY_STATE_PENDING" CANCELLED = "QUERY_STATE_CANCELLED" FAILED = "QUERY_STATE_FAILED" + EXPIRED = "QUERY_STATE_EXPIRED" @classmethod def terminal_states(cls) -> set[ExecutionState]: """ Returns the terminal states (i.e. when a query execution is no longer executing """ - return {cls.COMPLETED, cls.CANCELLED, cls.FAILED} + return {cls.COMPLETED, cls.CANCELLED, cls.FAILED, cls.EXPIRED} def is_complete(self) -> bool: """Returns True is state is completed, otherwise False.""" @@ -181,9 +184,13 @@ class ResultMetadata: Representation of Dune's Result Metadata from [Get] Query Results endpoint """ + # pylint: disable=too-many-instance-attributes + column_names: list[str] + row_count: int result_set_bytes: int total_row_count: int + total_result_set_bytes: int datapoint_count: int pending_time_millis: Optional[int] execution_time_millis: int @@ -195,15 +202,29 @@ def from_dict(cls, data: dict[str, Any]) -> ResultMetadata: pending_time = data.get("pending_time_millis", None) return cls( column_names=data["column_names"], + row_count=int(data["total_row_count"]), result_set_bytes=int(data["result_set_bytes"]), total_row_count=int(data["total_row_count"]), + total_result_set_bytes=int(data["result_set_bytes"]), datapoint_count=int(data["datapoint_count"]), pending_time_millis=int(pending_time) if pending_time else None, execution_time_millis=int(data["execution_time_millis"]), ) + def __add__(self, other: ResultMetadata) -> ResultMetadata: + """ + Enables combining results by updating the metadata associated to + an execution by using the `+` operator. + """ + assert other is not None + + self.row_count += other.row_count + self.result_set_bytes += other.result_set_bytes + self.datapoint_count += other.datapoint_count + return self -RowData = List[Dict[str, str]] + +RowData = List[Dict[str, Any]] MetaData = Dict[str, Union[int, List[str]]] @@ -217,6 +238,29 @@ class ExecutionResultCSV: """ data: BytesIO # includes all CSV rows, including the header row. + next_uri: Optional[str] = None + next_offset: Optional[int] = None + + def __add__(self, other: ExecutionResultCSV) -> ExecutionResultCSV: + assert other is not None + assert other.data is not None + + self.next_uri = other.next_uri + self.next_offset = other.next_offset + + # Get to the end of the current CSV + self.data.seek(0, SEEK_END) + + # Skip the first line of the new CSV, which contains the header + other.data.readline() + + # Append the rest of the content from `other` into current one + self.data.write(other.data.read()) + + # Move the cursor back to the start of the CSV + self.data.seek(0) + + return self @dataclass @@ -236,6 +280,15 @@ def from_dict(cls, data: dict[str, RowData | MetaData]) -> ExecutionResult: metadata=ResultMetadata.from_dict(data["metadata"]), ) + def __add__(self, other: ExecutionResult) -> ExecutionResult: + """ + Enables combining results using the `+` operator. + """ + self.rows.extend(other.rows) + self.metadata += other.metadata + + return self + ResultData = Dict[str, Union[RowData, MetaData]] @@ -252,6 +305,8 @@ class ResultsResponse: times: TimeData # optional because it will only be present when the query execution completes result: Optional[ExecutionResult] + next_uri: Optional[str] + next_offset: Optional[int] @classmethod def from_dict(cls, data: dict[str, str | int | ResultData]) -> ResultsResponse: @@ -261,12 +316,18 @@ def from_dict(cls, data: dict[str, str | int | ResultData]) -> ResultsResponse: assert isinstance(data["state"], str) result = data.get("result", {}) assert isinstance(result, dict) + next_uri = data.get("next_uri") + assert isinstance(next_uri, str) or next_uri is None + next_offset = data.get("next_offset") + assert isinstance(next_offset, int) or next_offset is None return cls( execution_id=data["execution_id"], query_id=int(data["query_id"]), state=ExecutionState(data["state"]), times=TimeData.from_dict(data), result=ExecutionResult.from_dict(result) if result else None, + next_uri=next_uri, + next_offset=next_offset, ) def get_rows(self) -> list[DuneRecord]: @@ -281,3 +342,15 @@ def get_rows(self) -> list[DuneRecord]: log.info(f"execution {self.state} returning empty list") return [] + + def __add__(self, other: ResultsResponse) -> ResultsResponse: + """ + Enables combining results using the `+` operator. + """ + assert self.execution_id == other.execution_id + assert self.result is not None + assert other.result is not None + self.result += other.result + self.next_uri = other.next_uri + self.next_offset = other.next_offset + return self diff --git a/dune_client/query.py b/dune_client/query.py index c961cc1..cc11226 100644 --- a/dune_client/query.py +++ b/dune_client/query.py @@ -1,6 +1,7 @@ """ Data Classes Representing a Dune Query """ + from __future__ import annotations import urllib.parse from dataclasses import dataclass @@ -11,7 +12,7 @@ def parse_query_object_or_id( query: Union[QueryBase, str, int], -) -> tuple[dict[str, str] | None, int]: +) -> tuple[dict[str, Any] | None, int]: """ Users are allowed to pass QueryBase or ID into some functions. This method handles both scenarios, returning a pair of the form (params, query_id) diff --git a/dune_client/types.py b/dune_client/types.py index b94ff2d..86811d7 100644 --- a/dune_client/types.py +++ b/dune_client/types.py @@ -3,6 +3,7 @@ https://github.com/bh2smith/duneapi/blob/v4.0.0/duneapi/types.py with small adjustments (removing Options from QueryParameter) """ + from __future__ import annotations import re @@ -12,7 +13,7 @@ from dune_client.util import postgres_date -DuneRecord = Dict[str, str] +DuneRecord = Dict[str, Any] # pylint: disable=too-few-public-methods diff --git a/dune_client/util.py b/dune_client/util.py index 50ac8cf..686bbf3 100644 --- a/dune_client/util.py +++ b/dune_client/util.py @@ -1,4 +1,5 @@ """Utility methods for package.""" + from datetime import datetime, timezone from typing import Optional diff --git a/tests/e2e/test_client.py b/tests/e2e/test_client.py index d8af755..39037f6 100644 --- a/tests/e2e/test_client.py +++ b/tests/e2e/test_client.py @@ -32,6 +32,10 @@ def setUp(self) -> None: QueryParameter.enum_type(name="ListField", value="Option 1"), ], ) + self.multi_rows_query = QueryBase( + name="Query that returns multiple rows", + query_id=3435763, + ) self.valid_api_key = os.environ["DUNE_API_KEY"] def copy_query_and_change_parameters(self) -> QueryBase: @@ -66,6 +70,25 @@ def test_run_query(self): results = dune.run_query(self.query).get_rows() self.assertGreater(len(results), 0) + def test_run_query_paginated(self): + # Arrange + dune = DuneClient(self.valid_api_key) + + # Act + results = dune.run_query(self.multi_rows_query, batch_size=1).get_rows() + + # Assert + self.assertEqual( + results, + [ + {"number": 1}, + {"number": 2}, + {"number": 3}, + {"number": 4}, + {"number": 5}, + ], + ) + def test_run_query_performance_large(self): dune = DuneClient(self.valid_api_key) results = dune.run_query(self.query, performance="large").get_rows() @@ -86,7 +109,7 @@ def test_parameters_recognized(self): { "text_field": "different word", "number_field": 22, - "date_field": "1991-01-01 00:00:00.000", + "date_field": "1991-01-01T00:00:00Z", "list_field": "Option 2", } ], @@ -197,6 +220,26 @@ def test_upload_csv_success(self): True, ) + def test_download_csv_with_pagination(self): + # Arrange + client = DuneClient(self.valid_api_key) + client.run_query(self.multi_rows_query) + + # Act + result_csv = client.download_csv(self.multi_rows_query.query_id, batch_size=1) + + # Assert + self.assertEqual( + pandas.read_csv(result_csv.data).to_dict(orient="records"), + [ + {"number": 1}, + {"number": 2}, + {"number": 3}, + {"number": 4}, + {"number": 5}, + ], + ) + def test_download_csv_success_by_id(self): client = DuneClient(self.valid_api_key) new_query = self.copy_query_and_change_parameters() @@ -211,7 +254,7 @@ def test_download_csv_success_by_id(self): { "text_field": "different word", "number_field": 22, - "date_field": "1991-01-01 00:00:00.000", + "date_field": "1991-01-01T00:00:00Z", "list_field": "Option 2", } ], @@ -233,7 +276,7 @@ def test_download_csv_success_with_params(self): pandas.read_csv(result_csv.data).to_dict(orient="records"), [ { - "date_field": "2022-05-04 00:00:00.000", + "date_field": "2022-05-04T00:00:00Z", "list_field": "Option 1", "number_field": 3.1415926535, "text_field": "Plain Text", diff --git a/tests/unit/test_models.py b/tests/unit/test_models.py index 0b19f25..856571d 100644 --- a/tests/unit/test_models.py +++ b/tests/unit/test_models.py @@ -44,7 +44,9 @@ def setUp(self) -> None: } self.result_metadata_data = { "column_names": ["ct", "TableName"], + "row_count": 8, "result_set_bytes": 194, + "total_result_set_bytes": 194, "total_row_count": 8, "datapoint_count": 2, "pending_time_millis": 54, @@ -169,8 +171,10 @@ def test_parse_status_response_completed(self): def test_parse_result_metadata(self): expected = ResultMetadata( column_names=["ct", "TableName"], + row_count=8, result_set_bytes=194, total_row_count=8, + total_result_set_bytes=194, datapoint_count=2, pending_time_millis=54, execution_time_millis=900, @@ -212,6 +216,8 @@ def test_parse_result_response(self): times=time_data, # Execution result parsing tested above in test_execution_result result=ExecutionResult.from_dict(self.results_response_data["result"]), + next_uri=None, + next_offset=None, ) self.assertEqual( expected, ResultsResponse.from_dict(self.results_response_data)