feat(wateruse): add water-use module for the NWDC API#328
Draft
thodson-usgs wants to merge 10 commits into
Draft
feat(wateruse): add water-use module for the NWDC API#328thodson-usgs wants to merge 10 commits into
thodson-usgs wants to merge 10 commits into
Conversation
Add `dataretrieval.wateruse.get_wateruse`, a getter for USGS National Water Availability Assessment Data Companion (NWDC) water-use estimates, served on a HUC12 grid at https://api.water.usgs.gov/nwaa-data/data. This is the modern replacement for the defunct legacy NWIS water-use service; `nwis.get_water_use` now points callers here. The NWDC is a plain CSV REST service, not an OGC API Features collection (it has no /collections, and its error envelope is `{detail}`, not the OGC engine's `{code, description}`), so it talks to the service directly rather than through the shared OGC engine. It still follows the package conventions: shared request headers via `ogc.engine._default_headers` (so `API_USGS_PAT` raises the rate limit), the typed `DataRetrievalError` taxonomy via `error_for_status` (with the NWDC `detail` surfaced), and a `(DataFrame, BaseMetadata)` return. Large areas (e.g. `huc2:04`, populous states) paginate via an RFC 8288 `rel="next"` Link header; the getter follows the pages and concatenates them. `huc12_id` is kept as a string so leading zeros survive. Includes offline tests (pytest-httpx), an API reference page, a README usage example + service-index entry, and a demo notebook (USGS_WaterUse_Examples.ipynb) answering a motivating question — the source (groundwater vs. surface water) and seasonal cycle of Wisconsin public-supply withdrawals. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
8841d52 to
2d55d48
Compare
`_default_headers` (the token-aware USGS-API request headers) lived in `ogc/engine.py`, but nothing about it is OGC-specific: it's consumed by the Water Data getters, the stats client, and `wateruse`, and `waterdata/utils` already re-exported it. The new `wateruse` module (a non-OGC CSV API) having to reach into the OGC engine for it was an inverted dependency. Move the definition to `dataretrieval/utils.py` — the package's neutral HTTP foundation, alongside `_get`, `HTTPX_DEFAULTS`, `query`, and `BaseMetadata` — and repoint the importers. `ogc.engine` and `waterdata.utils` re-import it, so `ogc.engine._default_headers` and `waterdata.utils._default_headers` still resolve to the same object; no behavior change. Also drops the now-unused `os` and `__version__` imports from the engine. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
Quality pass on the new module (no behavior change):
- Reuse `utils._raise_for_status` instead of a near-duplicate. It gains an
optional `detail_from` callback, invoked only on an error response, that
lets a caller surface its API's error wording without re-implementing the
status-to-type mapping or the message format. `wateruse` keeps just the tiny
NWDC-specific `_nwdc_error_detail` extractor (`{detail}` envelope) and passes
it in — so it now also inherits the shared 413/414 handling.
- Hoist the first page out of `_fetch_all_pages` into a small `_fetch_page`
helper: drops the `next_url`/`request_params` two-variable priming, the
`Optional` first_response, the in-loop `is None` guard, and the `assert`;
the final concat is now unconditional.
- Read CSV from `io.BytesIO(response.content)` rather than
`StringIO(response.text)`, skipping a full-body str decode/copy per page.
- Flatten the `_next_page_url` guard to a single check.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
Two hardening fixes from an xhigh code review:
- Guard the pagination loop against a non-advancing or cyclic `next` cursor.
A server bug that echoed the same `skip` URL (or a cycle) would otherwise
spin `_fetch_all_pages` forever, accumulating frames until OOM. Track the
fetched next-URLs and stop on a repeat — matching the bounded posture the
OGC `_paginate` already has.
- Convert pandas `EmptyDataError` to a typed `DataRetrievalError`. NWDC signals
no-data with a 400 (`{detail}`, already surfaced) or rows of zeros, never an
empty body — but if it ever returns one, honor the documented `Raises`
contract instead of leaking a bare pandas exception.
Adds offline tests for both (cyclic `next` terminates; empty body raises typed).
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
…ectors `get_wateruse(location="stateCd:RI")` leaked NWDC's `<type>:<id>` wire format. Replace the single `location` string with idiomatic selectors that mirror the waterdata / ngwmn getters: - `state` accepts a full name, postal code, or FIPS — resolved to the 2-letter postal code `stateCd` requires via `codes.states.to_state`, exactly like `ngwmn.get_sites`. - `county` is a 5-digit state+county FIPS code (`countyCd`). - `huc`'s length selects the level (`huc2`..`huc12`): "04" -> huc2, "07070005" -> huc8, "010900020502" -> huc12. Exactly one selector must be given; `_resolve_location` builds the wire `location` value and raises a clear ValueError on none / multiple / malformed selectors (verified live: stateCd needs postal not FIPS, countyCd needs 5 digits, hucN matches the code length). Updates the README, demo notebook, and docstrings; adds resolver unit + integration tests. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
The NWDC queries one location per request — comma-separated values 400, and a repeated `location=` param silently keeps only the last — so locations cannot be unioned server-side. But the waterdata/ngwmn getters accept lists, so let `state`, `county`, and `huc` each take a single value or an iterable and fan a list out into one request per location, concatenating the results (verified live: `state=["RI","DE"]` == RI ++ DE). `_resolve_locations` returns one wire `location` per value; `get_wateruse` loops over them. Exactly one selector type must still be given (you can't mix states and counties), and an empty list is rejected. Docstrings, README, demo notebook, and tests updated. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
- The `state` branch of `_resolve_locations` now normalizes `to_state`'s scalar/list result through `_as_list`, matching the `county`/`huc` branches instead of hand-rolling the str-vs-list split. - `_as_list` collapses to the explicit "non-string iterable -> list, else wrap" two-branch form. - `_fetch_all_pages` returns `frames[0]` for a single page instead of an unconditional `pd.concat`, so the common single-location/single-page path no longer copies the whole result twice (it concats once per location and once per page); this also matches the per-location concat in `get_wateruse`. No behavior change. 27 offline tests pass; live single-page, paginated, and fan-out paths verified unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
…eeded) A multi-value `state`/`county`/`huc` selector now fans out over a `ThreadPoolExecutor` instead of a serial loop. Concurrency is capped by a module-level `MAX_CONCURRENT_REQUESTS` (default 4; set to 1 for serial) — kept in this module rather than honoring the OGC engine's `API_USGS_CONCURRENT`, so wateruse stays decoupled from the engine. The locations are independent single requests over the synchronous `_get`, so the thread pool needs no shared state; `pool.map` preserves input order and re-raises the first failure. Stress-tested against the live NWDC at concurrency 1/2/4/8/16 over 16 distinct locations: all 200s, zero rate-limit/connection errors, and the rate budget depletes one token per request regardless of concurrency — so no request backoff/retry is required. End-to-end results are concurrency-invariant (byte-identical at conc 1/4/8) with a ~3.6x speedup at the default of 4. The single-location common path skips the pool entirely. Tests route each location to its own mocked response so the fan-out assertions are deterministic under thread races, and cover both the concurrent and serial (cap=1) paths. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
…e-limit
Replace the ThreadPoolExecutor fan-out with an asyncio implementation: one
shared `httpx.AsyncClient` paginates each location, `asyncio.gather` (bounded by
a semaphore at `MAX_CONCURRENT_REQUESTS`) fans the locations out, and input
order is preserved for a deterministic concat. The single client keeps
connections alive across pages and locations (the old per-call `httpx.get`
opened a fresh connection every page). The event loop runs in a worker thread,
so it is safe even when called inside an already-running loop (Jupyter) — a
bare `asyncio.run` would raise there.
`md.header` now surfaces the *final* rate-limit headers — the response with the
lowest `x-ratelimit-remaining` (the quota left after the whole fan-out) — plus
cumulative elapsed, instead of the first request's values. (The OGC engine
already aggregates this way via `_aggregate_paginated_response` /
`_combine_chunk_responses`, so only wateruse needed the fix.)
Reuse: the genuinely-shared, low-coupling primitives only (`_default_headers`,
`_raise_for_status(detail_from=...)` keeping NWDC's `{detail}` errors,
`_network_error`, `BaseMetadata`, `HTTPX_DEFAULTS`). Deliberately NOT the OGC
`_paginate` — it hardcodes `_raise_for_non_200` (the `{code, description}`
envelope, wrong for NWDC's `{detail}`) and is entangled with the engine's
context vars; the CSV/Link pager is ~20 lines locally. The sync→async bridge is
stdlib (`asyncio.run` in a worker thread), not anyio, which isn't a declared
dependency.
Verified live: single/paginated/fan-out results unchanged and order-stable,
the final (lowest) rate-limit header surfaces, `{detail}` errors preserved, and
calls succeed inside a running event loop. Offline tests cover the rate-limit
aggregation; 31 pass.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
…ient state
Reuse and unify the OGC engine's HTTP pagination / aggregation / error-recovery /
ambient-state plumbing instead of carrying parallel implementations. Net source
reduction of ~66 LOC, behavior-preserving, plus one rate-limit correctness fix.
wateruse reuse:
- wateruse drives the engine's generic `_paginate` (with an injected
`raise_for_status` for the NWDC `{detail}` envelope), `_run_sync` (anyio portal,
Jupyter-safe), and `_combine_chunk_frames` / `_combine_chunk_responses`
aggregators — replacing a hand-rolled pager, thread bridge, and bespoke
aggregation. `_resolve_locations` becomes a `_LOCATION_BUILDERS` table dispatch,
dropping a 3-way if/elif and a duplicated selector enumeration.
Engine unification:
- `planning._merge_response`: one low-level "fold N responses into one" behind both
pagination (`_paginate`) and chunked/fan-out aggregation
(`_combine_chunk_responses`), replacing two near-duplicate implementations; deletes
`engine._aggregate_paginated_response`.
- `utils.Ambient[T]`: a generic ContextVar-with-scope class collapsing each per-call
ambient (`_row_cap`, `_ogc_base_url`, `_dialect`, the chunker's `_chunked_client`)
from a var + hand-written `@contextmanager` setter pair into one declaration.
`with _x(value):` call sites unchanged; readers shorten to `_x.get()`.
- `_paginate`'s verbatim per-page progress block deduped into a `report_page` closure.
- `_combine_chunk_responses`: dropped a dead single-response branch.
- `_QUOTA_HEADER` moved to the base `planning` module — dedups the literal and fixes
a layering inversion (planning had hard-coded it, unable to import from chunking).
- `_cql2_param`: CQL2 filter list built as a comprehension.
- `engine._check_id_format`: inlined into its only caller; dead re-export dropped.
Rate-limit correctness fix:
- `x-ratelimit-remaining` now reports the LOWEST value any concurrent sub-request
saw (the quota actually left after a fan-out), via a shared `_lowest_remaining`,
instead of the last-by-index — fixing a latent inaccuracy in the OGC chunker too.
Behavior-preserving (live-verified); offline OGC/wateruse/utils/progress suites
green; ruff + mypy --strict clean.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a
dataretrieval.waterusemodule for retrieving USGS National WaterAvailability Assessment Data Companion (NWDC) water-use estimates from
https://api.water.usgs.gov/nwaa-data/data. Estimates are modeled on a HUC12grid and queryable by county, state, or hydrologic unit. This is the modern
replacement for the defunct legacy NWIS water-use service, so
nwis.get_water_usenow points callers here.It covers the same data as the R
dataRetrieval::read_waterdata_use_datagetter, but is written to the Python package's conventions rather than ported
from the R structure.
Design notes
The NWDC is a plain CSV REST service, not an OGC API Features collection —
it has no
/collectionsor/conformance, and its error envelope is{"detail": ...}rather than the OGC engine's{code, description}. So it doesnot use the high-level OGC path (
get_ogc_data, the CQL2 byte-chunker, theGeoJSON pager,
_raise_for_non_200). It does reuse the engine's generictransport plumbing (extracted/cleaned up in the refactor commit below),
supplying only NWDC-specific strategies, and stays consistent with the package
where the shared pieces fit:
(DataFrame, BaseMetadata)tuple.utils._default_headers()for request headers, so the documentedAPI_USGS_PATtoken raises the NWDC rate limit just as it does for the OGCgetters.
DataRetrievalErrortaxonomy (viautils._raise_for_statuswith an injected detail extractor), surfacing theNWDC
detail(e.g."Invalid model name: ...") in the message.state/county/hucselectors (mirroringngwmn/waterdata), each accepting a single value or a list. Since NWDCtakes one location per request, a multi-value selector fans out — one
request per location, run concurrently over a shared client.
variableis comma-joined into a single GET (viautils.to_str).an RFC 8288
Link: <...>; rel="next"header (verified: ahuc2→ 7 pages, apopulous state → 4; small queries → a single page). Rather than a bespoke
loop, wateruse drives the engine's generic
_paginate(with NWDC parse /cursor / error strategies) and concatenates the pages.
huc12_idis parsed as a string so leading zeros survive.Folded-in engine refactor (second commit)
Building wateruse surfaced that it could reuse the OGC engine's transport
instead of re-implementing it — and that extracting the reusable seams also
de-duplicated the engine itself. The second commit on this branch,
refactor(ogc): reuse + unify the OGC engine — pager, aggregation, ambient state, does that. Net source ≈ −66 LOC, behavior-preserving, and it readsindependently of the wateruse module (feature and refactor are split into two
commits on purpose):
planning._merge_response— one low-level "fold N responses into one"behind both pagination (
_paginate) and the chunked / fan-out aggregation(
_combine_chunk_responses), replacing two near-duplicate implementationsacross modules; deletes
engine._aggregate_paginated_response.utils.Ambient[T]— a small generic ContextVar-with-scope class thatcollapses each per-call ambient (
_row_cap,_ogc_base_url,_dialect, thechunker's
_chunked_client) from a var + hand-written@contextmanagersetter pair into a single declaration.
with _x(value):call sites areunchanged; readers shorten to
_x.get()._paginate/_run_sync(Jupyter-safe anyio portal) /_combine_chunk_*rather than a hand-rolled pager + thread bridge; its_resolve_locationsbecomes a_LOCATION_BUILDERSdispatch table._paginate's verbatim per-page progress block → areport_pageclosure; a dead single-response branch dropped;_QUOTA_HEADERmoved to the base
planningmodule (dedups the literal and fixes a layeringinversion); CQL2 filters built as a comprehension; the single-use
_check_id_formathelper inlined and its dead re-export removed.x-ratelimit-remainingnow reports thelowest value any concurrent sub-request saw (the quota actually left after
a fan-out) via a shared
_lowest_remaining, instead of the last-by-index —fixing a latent inaccuracy in the OGC chunker too.
What's included
dataretrieval/wateruse.py— the module, wired intodataretrieval/__init__.py.ogc/{engine,planning,chunking}.py,utils.py,and
waterdata/utils.py(second commit).tests/wateruse_test.py— offlinepytest-httpxcoverage: single-page parse,string
huc12_id, comma-joined variables, dropped-None params, Link-headerpagination + concatenation, bare-host normalization, shared-header reuse,
state/county/huc selectors + fan-out, and typed-error/
detailhandling; plusupdates to
tests/waterdata_{chunking,utils,}_test.pyfor the engine changes.docs/source/reference/wateruse.rst+ toctree entry.README.md— usage example and an "Available Data Services" entry.demos/USGS_WaterUse_Examples.ipynb— a motivating walkthrough: whereWisconsin's public water supply comes from (groundwater vs. surface water)
and its summer demand peak.
Verification
tests/wateruse_test.py) plus the OGCengine / chunking / progress / utils suites the refactor touches;
ruff check/ruff format/mypy --strictclean.monthly and annual resolutions, paginated results byte-identical to the
unpaginated equivalent, concurrent fan-out over multiple states, and the
lowest-remaining rate-limit header confirmed. The demo notebook was executed
end-to-end (outputs stripped on commit per the repo's
nbstripouthook).🤖 Generated with Claude Code