Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/SDK_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,28 @@ for row in result:
total = len(result)
```

### Large Result Sets

There is no fixed SDK row cap. A query can return as many rows as the instance
allows — its configured `query.maxLimit` (discovered automatically from
`/api/v1/health`; e.g. 100,000 on Lightdash Cloud). Request more than that and
the SDK raises a clear `ValueError` instead of letting the server silently
return a truncated result:

```python
# Fetch a large extract — pages are streamed transparently
result = model.query().metrics(model.metrics.revenue).limit(100_000).execute()
df = result.to_df()

# Asking for more than the instance allows fails loudly
model.query().limit(10_000_000).execute()
# ValueError: Limit 10000000 exceeds this instance's maximum query limit of 100000...
```

Large fetches page at the instance's `maxPageSize` to minimise round-trips, and
every page uses the same size so no rows are skipped. To pull result sets larger
than `query.maxLimit`, use a CSV/Excel export instead.

### Pagination

For large result sets, results are paginated automatically:
Expand Down
18 changes: 18 additions & 0 deletions lightdash/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,24 @@ def _make_request(

return data["results"]

def get_query_limits(self) -> Dict[str, Any]:
"""
Return this instance's query limit configuration, cached after the
first call.

Reads ``query`` from ``/api/v1/health`` — notably ``maxLimit`` (the
maximum number of rows a query may return) and ``maxPageSize`` (the
largest page the results API will serve). These are instance/org
configurable, so the SDK discovers them rather than hard-coding a cap.

Returns:
The ``query`` config dict (empty dict if unavailable).
"""
if not hasattr(self, "_query_limits"):
health = self._make_request("GET", "/api/v1/health")
self._query_limits = health.get("query", {}) or {}
return self._query_limits

def _fetch_models(self) -> List[Model]:
"""Internal method to fetch models from API."""
path = f"/api/v1/projects/{self.project_uuid}/explores"
Expand Down
86 changes: 65 additions & 21 deletions lightdash/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def execute(
self,
query_payload: Dict[str, Any],
timeout_seconds: float = 300,
invalidate_cache: bool = False
invalidate_cache: bool = False,
page_size: int = 500
) -> "QueryResult":
"""Submit query via V2 API and poll until complete."""
# Step 1: Submit query
Expand All @@ -40,14 +41,16 @@ def execute(
query_uuid = submit_response["queryUuid"]
fields = submit_response.get("fields", {})

# Step 2: Poll for first page
first_page = self._poll_until_ready(query_uuid, timeout_seconds)
# Step 2: Poll for first page. The same page_size is reused for every
# page so totalPageCount and page numbering stay consistent.
first_page = self._poll_until_ready(query_uuid, timeout_seconds, page_size=page_size)

return QueryResult(
query_uuid=query_uuid,
fields=fields,
first_page=first_page,
executor=self
executor=self,
page_size=page_size
)

def _poll_until_ready(
Expand Down Expand Up @@ -127,12 +130,14 @@ def __init__(
query_uuid: str,
fields: Dict[str, Any],
first_page: Dict[str, Any],
executor: _QueryExecutor
executor: _QueryExecutor,
page_size: int = 500
):
self._query_uuid = query_uuid
self._fields = fields
self._first_page = first_page
self._executor = executor
self._page_size = page_size
self._all_rows: Optional[List[Dict[str, Any]]] = None
self._field_labels = self._build_field_labels()

Expand Down Expand Up @@ -179,39 +184,51 @@ def fields(self) -> Dict[str, Any]:
"""Field metadata from the query."""
return self._fields

def page(self, page_num: int, page_size: int = 500) -> List[Dict[str, Any]]:
def page(self, page_num: int, page_size: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Get a specific page of results.

Args:
page_num: Page number (1-indexed)
page_size: Number of rows per page (max 5000)
page_size: Rows per page. Defaults to the size the query was
fetched with (bounded by the instance's ``maxPageSize``).

Returns:
List of row dictionaries for the requested page
"""
if page_num == 1 and page_size == self._first_page.get("pageSize", 500):
ps = page_size or self._page_size
if page_num == 1 and ps == self._first_page.get("pageSize", self._page_size):
return self._transform_rows(self._first_page.get("rows", []))

page_data = self._executor.get_page(self._query_uuid, page_num, page_size)
page_data = self._executor.get_page(self._query_uuid, page_num, ps)
return self._transform_rows(page_data.get("rows", []))

def iter_pages(self, page_size: int = 500) -> Iterator[List[Dict[str, Any]]]:
def iter_pages(self, page_size: Optional[int] = None) -> Iterator[List[Dict[str, Any]]]:
"""
Iterate through all pages of results.

Args:
page_size: Number of rows per page
page_size: Rows per page. Defaults to the size the query was
fetched with. The page count is derived from this size and
``total_results`` so every row is yielded exactly once.

Yields:
List of row dictionaries for each page
"""
# Yield first page
yield self._transform_rows(self._first_page.get("rows", []))
ps = page_size or self._page_size

# Reuse the already-fetched first page only when its size matches the
# requested page size; otherwise re-fetch from page 1 at the new size.
if ps == self._first_page.get("pageSize", self._page_size):
yield self._transform_rows(self._first_page.get("rows", []))
start_page = 2
else:
start_page = 1

# Fetch and yield remaining pages
for page_num in range(2, self.total_pages + 1):
page_data = self._executor.get_page(self._query_uuid, page_num, page_size)
total = self.total_results
num_pages = (total + ps - 1) // ps if total else 1
for page_num in range(start_page, num_pages + 1):
page_data = self._executor.get_page(self._query_uuid, page_num, ps)
yield self._transform_rows(page_data.get("rows", []))

def to_records(self) -> List[Dict[str, Any]]:
Expand Down Expand Up @@ -514,7 +531,10 @@ def limit(self, n: int) -> "Query":
Returns a new Query with the specified limit.

Args:
n: Maximum number of rows to return (1-50000)
n: Maximum number of rows to return. The upper bound is the
instance's configured ``query.maxLimit`` (discovered at execute
time), not a fixed SDK cap. Requesting more raises a ValueError
rather than silently returning a truncated result.

Returns:
A new Query with the limit set
Expand Down Expand Up @@ -606,19 +626,43 @@ def execute(
if self._result is not None and not invalidate_cache:
return self._result

if not 1 <= self._limit <= 50000:
raise ValueError("Limit must be between 1 and 50000")
if self._limit < 1:
raise ValueError("Limit must be at least 1")

if self._model._client is None:
raise RuntimeError("Model not properly initialized with client reference")

executor = _QueryExecutor(self._model._client)
client = self._model._client

# Discover the instance's real limits rather than hard-coding a cap.
# Fail open if /health is unreachable - the server still enforces them.
try:
limits = client.get_query_limits()
except Exception:
limits = {}

max_limit = limits.get("maxLimit")
if max_limit and self._limit > max_limit:
# Raise rather than let the server silently clamp and return a
# truncated result that looks complete.
raise ValueError(
f"Limit {self._limit} exceeds this instance's maximum query limit "
f"of {max_limit}. Lower the limit, or export larger result sets via CSV."
)

# Page through results at the largest size the instance allows (bounded
# by the requested limit) to minimise round-trips on large extracts.
max_page_size = limits.get("maxPageSize") or 500
page_size = max(1, min(max_page_size, self._limit))

executor = _QueryExecutor(client)
payload = self._build_payload()

self._result = executor.execute(
payload,
timeout_seconds=timeout_seconds,
invalidate_cache=invalidate_cache
invalidate_cache=invalidate_cache,
page_size=page_size
)
return self._result

Expand Down
2 changes: 2 additions & 0 deletions lightdash/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def _make_request(
json: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]: ...

def get_query_limits(self) -> Dict[str, Any]: ...


class Model(Protocol):
"""Type protocol for a Lightdash model."""
Expand Down
55 changes: 35 additions & 20 deletions tests/test_acceptance.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,28 +305,43 @@ def test_query_with_field_ids(first_model):
assert metric_label in row


def test_query_limit_validation(first_model):
"""Test that query limits are properly validated."""
dimensions = first_model.list_dimensions()
metrics = first_model.list_metrics()

if not dimensions or not metrics:
pytest.skip("No dimensions or metrics available for testing")

# Test invalid limits (V2 API supports up to 50000)
with pytest.raises(ValueError, match="Limit must be between 1 and 50000"):
first_model.query(
dimensions=[dimensions[0].field_id],
metrics=[metrics[0].field_id],
limit=0,
def test_query_limit_validation(client):
"""Test that query limits are properly validated (issue #19)."""
# Find a model with at least one dimension and metric so the query paths
# below actually execute (models[0] may be a fieldless staging model).
model = dim = metric = None
for m in client.list_models():
dims = m.list_dimensions()
mets = m.list_metrics()
if dims and mets:
model, dim, metric = m, dims[0], mets[0]
break
if model is None:
pytest.skip("No model with a dimension and metric available")

# A limit below 1 is rejected locally
with pytest.raises(ValueError, match="Limit must be at least 1"):
model.query(
dimensions=[dim.field_id], metrics=[metric.field_id], limit=0,
).to_records()

with pytest.raises(ValueError, match="Limit must be between 1 and 50000"):
first_model.query(
dimensions=[dimensions[0].field_id],
metrics=[metrics[0].field_id],
limit=50001,
).to_records()
# A limit above the instance's configured maxLimit is rejected with a clear
# error rather than silently truncated.
max_limit = client.get_query_limits().get("maxLimit")
if max_limit:
with pytest.raises(ValueError, match="exceeds this instance's maximum"):
model.query(
dimensions=[dim.field_id], metrics=[metric.field_id],
limit=max_limit + 1,
).to_records()

# A limit above the old hard-coded 50k cap (but within maxLimit) is now
# accepted. execute() only fetches the first page, so this stays cheap.
if max_limit and max_limit > 50000:
result = model.query(
dimensions=[dim.field_id], metrics=[metric.field_id], limit=50001,
).execute()
assert result is not None


def test_query_requires_client(client_params):
Expand Down
Loading