Skip to content

feat(wateruse): add water-use module for the NWDC API#328

Draft
thodson-usgs wants to merge 10 commits into
DOI-USGS:mainfrom
thodson-usgs:feat/wateruse
Draft

feat(wateruse): add water-use module for the NWDC API#328
thodson-usgs wants to merge 10 commits into
DOI-USGS:mainfrom
thodson-usgs:feat/wateruse

Conversation

@thodson-usgs

@thodson-usgs thodson-usgs commented Jun 22, 2026

Copy link
Copy Markdown
Collaborator

Summary

Adds a dataretrieval.wateruse module for retrieving USGS National Water
Availability Assessment Data Companion (NWDC)
water-use estimates from
https://api.water.usgs.gov/nwaa-data/data. Estimates are modeled on a HUC12
grid and queryable by county, state, or hydrologic unit. This is the modern
replacement for the defunct legacy NWIS water-use service, so
nwis.get_water_use now points callers here.

It covers the same data as the R
dataRetrieval::read_waterdata_use_data
getter, but is written to the Python package's conventions rather than ported
from the R structure.

from dataretrieval import wateruse

df, md = wateruse.get_wateruse(
    model="wu-public-supply-wd",
    variable=["pswdtot", "pswdgw", "pswdsw"],
    state="RI",
    startdate="2020-01",
    timeres="monthly",
)

Design notes

The NWDC is a plain CSV REST service, not an OGC API Features collection
it has no /collections or /conformance, and its error envelope is
{"detail": ...} rather than the OGC engine's {code, description}. So it does
not use the high-level OGC path (get_ogc_data, the CQL2 byte-chunker, the
GeoJSON pager, _raise_for_non_200). It does reuse the engine's generic
transport plumbing
(extracted/cleaned up in the refactor commit below),
supplying only NWDC-specific strategies, and stays consistent with the package
where the shared pieces fit:

  • Returns the conventional (DataFrame, BaseMetadata) tuple.
  • Reuses utils._default_headers() for request headers, so the documented
    API_USGS_PAT token raises the NWDC rate limit just as it does for the OGC
    getters.
  • Raises through the shared typed DataRetrievalError taxonomy (via
    utils._raise_for_status with an injected detail extractor), surfacing the
    NWDC detail (e.g. "Invalid model name: ...") in the message.
  • Locations are idiomatic state / county / huc selectors (mirroring
    ngwmn / waterdata), each accepting a single value or a list. Since NWDC
    takes one location per request, a multi-value selector fans out — one
    request per location, run concurrently over a shared client.
  • Multi-valued variable is comma-joined into a single GET (via utils.to_str).
  • Pagination is real and handled transparently. Large areas paginate with
    an RFC 8288 Link: <...>; rel="next" header (verified: a huc2 → 7 pages, a
    populous state → 4; small queries → a single page). Rather than a bespoke
    loop, wateruse drives the engine's generic _paginate (with NWDC parse /
    cursor / error strategies) and concatenates the pages.
  • huc12_id is parsed as a string so leading zeros survive.

Folded-in engine refactor (second commit)

Building wateruse surfaced that it could reuse the OGC engine's transport
instead of re-implementing it — and that extracting the reusable seams also
de-duplicated the engine itself. The second commit on this branch,
refactor(ogc): reuse + unify the OGC engine — pager, aggregation, ambient state, does that. Net source ≈ −66 LOC, behavior-preserving, and it reads
independently of the wateruse module (feature and refactor are split into two
commits on purpose):

  • planning._merge_response — one low-level "fold N responses into one"
    behind both pagination (_paginate) and the chunked / fan-out aggregation
    (_combine_chunk_responses), replacing two near-duplicate implementations
    across modules; deletes engine._aggregate_paginated_response.
  • utils.Ambient[T] — a small generic ContextVar-with-scope class that
    collapses each per-call ambient (_row_cap, _ogc_base_url, _dialect, the
    chunker's _chunked_client) from a var + hand-written @contextmanager
    setter pair into a single declaration. with _x(value): call sites are
    unchanged; readers shorten to _x.get().
  • wateruse reuses _paginate / _run_sync (Jupyter-safe anyio portal) /
    _combine_chunk_* rather than a hand-rolled pager + thread bridge; its
    _resolve_locations becomes a _LOCATION_BUILDERS dispatch table.
  • Smaller dedups: _paginate's verbatim per-page progress block → a
    report_page closure; a dead single-response branch dropped; _QUOTA_HEADER
    moved to the base planning module (dedups the literal and fixes a layering
    inversion); CQL2 filters built as a comprehension; the single-use
    _check_id_format helper inlined and its dead re-export removed.
  • Rate-limit correctness fix: x-ratelimit-remaining now reports the
    lowest value any concurrent sub-request saw (the quota actually left after
    a fan-out) via a shared _lowest_remaining, instead of the last-by-index —
    fixing a latent inaccuracy in the OGC chunker too.

What's included

  • dataretrieval/wateruse.py — the module, wired into dataretrieval/__init__.py.
  • The engine refactor across ogc/{engine,planning,chunking}.py, utils.py,
    and waterdata/utils.py (second commit).
  • tests/wateruse_test.py — offline pytest-httpx coverage: single-page parse,
    string huc12_id, comma-joined variables, dropped-None params, Link-header
    pagination + concatenation, bare-host normalization, shared-header reuse,
    state/county/huc selectors + fan-out, and typed-error/detail handling; plus
    updates to tests/waterdata_{chunking,utils,}_test.py for the engine changes.
  • docs/source/reference/wateruse.rst + toctree entry.
  • README.md — usage example and an "Available Data Services" entry.
  • demos/USGS_WaterUse_Examples.ipynb — a motivating walkthrough: where
    Wisconsin's public water supply comes from (groundwater vs. surface water)
    and its summer demand peak.

Verification

  • Offline suites pass — wateruse (tests/wateruse_test.py) plus the OGC
    engine / chunking / progress / utils suites the refactor touches;
    ruff check / ruff format / mypy --strict clean.
  • Smoke-tested against the live API: single-page and multi-page queries,
    monthly and annual resolutions, paginated results byte-identical to the
    unpaginated equivalent, concurrent fan-out over multiple states, and the
    lowest-remaining rate-limit header confirmed. The demo notebook was executed
    end-to-end (outputs stripped on commit per the repo's nbstripout hook).

🤖 Generated with Claude Code

Add `dataretrieval.wateruse.get_wateruse`, a getter for USGS National Water
Availability Assessment Data Companion (NWDC) water-use estimates, served on a
HUC12 grid at https://api.water.usgs.gov/nwaa-data/data. This is the modern
replacement for the defunct legacy NWIS water-use service; `nwis.get_water_use`
now points callers here.

The NWDC is a plain CSV REST service, not an OGC API Features collection (it has
no /collections, and its error envelope is `{detail}`, not the OGC engine's
`{code, description}`), so it talks to the service directly rather than through
the shared OGC engine. It still follows the package conventions: shared request
headers via `ogc.engine._default_headers` (so `API_USGS_PAT` raises the rate
limit), the typed `DataRetrievalError` taxonomy via `error_for_status` (with the
NWDC `detail` surfaced), and a `(DataFrame, BaseMetadata)` return. Large areas
(e.g. `huc2:04`, populous states) paginate via an RFC 8288 `rel="next"` Link
header; the getter follows the pages and concatenates them. `huc12_id` is kept
as a string so leading zeros survive.

Includes offline tests (pytest-httpx), an API reference page, a README usage
example + service-index entry, and a demo notebook (USGS_WaterUse_Examples.ipynb)
answering a motivating question — the source (groundwater vs. surface water) and
seasonal cycle of Wisconsin public-supply withdrawals.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
@thodson-usgs thodson-usgs changed the title feat(wateruse): add water-use module wrapping the NWDC API feat(wateruse): add water-use module for the NWDC API Jun 22, 2026
thodson-usgs and others added 8 commits June 22, 2026 16:28
`_default_headers` (the token-aware USGS-API request headers) lived in
`ogc/engine.py`, but nothing about it is OGC-specific: it's consumed by the
Water Data getters, the stats client, and `wateruse`, and `waterdata/utils`
already re-exported it. The new `wateruse` module (a non-OGC CSV API) having
to reach into the OGC engine for it was an inverted dependency.

Move the definition to `dataretrieval/utils.py` — the package's neutral HTTP
foundation, alongside `_get`, `HTTPX_DEFAULTS`, `query`, and `BaseMetadata` —
and repoint the importers. `ogc.engine` and `waterdata.utils` re-import it, so
`ogc.engine._default_headers` and `waterdata.utils._default_headers` still
resolve to the same object; no behavior change. Also drops the now-unused `os`
and `__version__` imports from the engine.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
Quality pass on the new module (no behavior change):

- Reuse `utils._raise_for_status` instead of a near-duplicate. It gains an
  optional `detail_from` callback, invoked only on an error response, that
  lets a caller surface its API's error wording without re-implementing the
  status-to-type mapping or the message format. `wateruse` keeps just the tiny
  NWDC-specific `_nwdc_error_detail` extractor (`{detail}` envelope) and passes
  it in — so it now also inherits the shared 413/414 handling.
- Hoist the first page out of `_fetch_all_pages` into a small `_fetch_page`
  helper: drops the `next_url`/`request_params` two-variable priming, the
  `Optional` first_response, the in-loop `is None` guard, and the `assert`;
  the final concat is now unconditional.
- Read CSV from `io.BytesIO(response.content)` rather than
  `StringIO(response.text)`, skipping a full-body str decode/copy per page.
- Flatten the `_next_page_url` guard to a single check.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
Two hardening fixes from an xhigh code review:

- Guard the pagination loop against a non-advancing or cyclic `next` cursor.
  A server bug that echoed the same `skip` URL (or a cycle) would otherwise
  spin `_fetch_all_pages` forever, accumulating frames until OOM. Track the
  fetched next-URLs and stop on a repeat — matching the bounded posture the
  OGC `_paginate` already has.
- Convert pandas `EmptyDataError` to a typed `DataRetrievalError`. NWDC signals
  no-data with a 400 (`{detail}`, already surfaced) or rows of zeros, never an
  empty body — but if it ever returns one, honor the documented `Raises`
  contract instead of leaking a bare pandas exception.

Adds offline tests for both (cyclic `next` terminates; empty body raises typed).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
…ectors

`get_wateruse(location="stateCd:RI")` leaked NWDC's `<type>:<id>` wire format.
Replace the single `location` string with idiomatic selectors that mirror the
waterdata / ngwmn getters:

- `state` accepts a full name, postal code, or FIPS — resolved to the 2-letter
  postal code `stateCd` requires via `codes.states.to_state`, exactly like
  `ngwmn.get_sites`.
- `county` is a 5-digit state+county FIPS code (`countyCd`).
- `huc`'s length selects the level (`huc2`..`huc12`): "04" -> huc2,
  "07070005" -> huc8, "010900020502" -> huc12.

Exactly one selector must be given; `_resolve_location` builds the wire
`location` value and raises a clear ValueError on none / multiple / malformed
selectors (verified live: stateCd needs postal not FIPS, countyCd needs 5
digits, hucN matches the code length). Updates the README, demo notebook, and
docstrings; adds resolver unit + integration tests.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
The NWDC queries one location per request — comma-separated values 400, and a
repeated `location=` param silently keeps only the last — so locations cannot
be unioned server-side. But the waterdata/ngwmn getters accept lists, so let
`state`, `county`, and `huc` each take a single value or an iterable and fan a
list out into one request per location, concatenating the results (verified
live: `state=["RI","DE"]` == RI ++ DE).

`_resolve_locations` returns one wire `location` per value; `get_wateruse`
loops over them. Exactly one selector type must still be given (you can't mix
states and counties), and an empty list is rejected. Docstrings, README, demo
notebook, and tests updated.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
- The `state` branch of `_resolve_locations` now normalizes `to_state`'s
  scalar/list result through `_as_list`, matching the `county`/`huc` branches
  instead of hand-rolling the str-vs-list split.
- `_as_list` collapses to the explicit "non-string iterable -> list, else wrap"
  two-branch form.
- `_fetch_all_pages` returns `frames[0]` for a single page instead of an
  unconditional `pd.concat`, so the common single-location/single-page path
  no longer copies the whole result twice (it concats once per location and
  once per page); this also matches the per-location concat in `get_wateruse`.

No behavior change. 27 offline tests pass; live single-page, paginated, and
fan-out paths verified unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
…eeded)

A multi-value `state`/`county`/`huc` selector now fans out over a
`ThreadPoolExecutor` instead of a serial loop. Concurrency is capped by a
module-level `MAX_CONCURRENT_REQUESTS` (default 4; set to 1 for serial) — kept
in this module rather than honoring the OGC engine's `API_USGS_CONCURRENT`, so
wateruse stays decoupled from the engine. The locations are independent single
requests over the synchronous `_get`, so the thread pool needs no shared state;
`pool.map` preserves input order and re-raises the first failure.

Stress-tested against the live NWDC at concurrency 1/2/4/8/16 over 16 distinct
locations: all 200s, zero rate-limit/connection errors, and the rate budget
depletes one token per request regardless of concurrency — so no request
backoff/retry is required. End-to-end results are concurrency-invariant
(byte-identical at conc 1/4/8) with a ~3.6x speedup at the default of 4.

The single-location common path skips the pool entirely. Tests route each
location to its own mocked response so the fan-out assertions are deterministic
under thread races, and cover both the concurrent and serial (cap=1) paths.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
…e-limit

Replace the ThreadPoolExecutor fan-out with an asyncio implementation: one
shared `httpx.AsyncClient` paginates each location, `asyncio.gather` (bounded by
a semaphore at `MAX_CONCURRENT_REQUESTS`) fans the locations out, and input
order is preserved for a deterministic concat. The single client keeps
connections alive across pages and locations (the old per-call `httpx.get`
opened a fresh connection every page). The event loop runs in a worker thread,
so it is safe even when called inside an already-running loop (Jupyter) — a
bare `asyncio.run` would raise there.

`md.header` now surfaces the *final* rate-limit headers — the response with the
lowest `x-ratelimit-remaining` (the quota left after the whole fan-out) — plus
cumulative elapsed, instead of the first request's values. (The OGC engine
already aggregates this way via `_aggregate_paginated_response` /
`_combine_chunk_responses`, so only wateruse needed the fix.)

Reuse: the genuinely-shared, low-coupling primitives only (`_default_headers`,
`_raise_for_status(detail_from=...)` keeping NWDC's `{detail}` errors,
`_network_error`, `BaseMetadata`, `HTTPX_DEFAULTS`). Deliberately NOT the OGC
`_paginate` — it hardcodes `_raise_for_non_200` (the `{code, description}`
envelope, wrong for NWDC's `{detail}`) and is entangled with the engine's
context vars; the CSV/Link pager is ~20 lines locally. The sync→async bridge is
stdlib (`asyncio.run` in a worker thread), not anyio, which isn't a declared
dependency.

Verified live: single/paginated/fan-out results unchanged and order-stable,
the final (lowest) rate-limit header surfaces, `{detail}` errors preserved, and
calls succeed inside a running event loop. Offline tests cover the rate-limit
aggregation; 31 pass.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
…ient state

Reuse and unify the OGC engine's HTTP pagination / aggregation / error-recovery /
ambient-state plumbing instead of carrying parallel implementations. Net source
reduction of ~66 LOC, behavior-preserving, plus one rate-limit correctness fix.

wateruse reuse:
- wateruse drives the engine's generic `_paginate` (with an injected
  `raise_for_status` for the NWDC `{detail}` envelope), `_run_sync` (anyio portal,
  Jupyter-safe), and `_combine_chunk_frames` / `_combine_chunk_responses`
  aggregators — replacing a hand-rolled pager, thread bridge, and bespoke
  aggregation. `_resolve_locations` becomes a `_LOCATION_BUILDERS` table dispatch,
  dropping a 3-way if/elif and a duplicated selector enumeration.

Engine unification:
- `planning._merge_response`: one low-level "fold N responses into one" behind both
  pagination (`_paginate`) and chunked/fan-out aggregation
  (`_combine_chunk_responses`), replacing two near-duplicate implementations; deletes
  `engine._aggregate_paginated_response`.
- `utils.Ambient[T]`: a generic ContextVar-with-scope class collapsing each per-call
  ambient (`_row_cap`, `_ogc_base_url`, `_dialect`, the chunker's `_chunked_client`)
  from a var + hand-written `@contextmanager` setter pair into one declaration.
  `with _x(value):` call sites unchanged; readers shorten to `_x.get()`.
- `_paginate`'s verbatim per-page progress block deduped into a `report_page` closure.
- `_combine_chunk_responses`: dropped a dead single-response branch.
- `_QUOTA_HEADER` moved to the base `planning` module — dedups the literal and fixes
  a layering inversion (planning had hard-coded it, unable to import from chunking).
- `_cql2_param`: CQL2 filter list built as a comprehension.
- `engine._check_id_format`: inlined into its only caller; dead re-export dropped.

Rate-limit correctness fix:
- `x-ratelimit-remaining` now reports the LOWEST value any concurrent sub-request
  saw (the quota actually left after a fan-out), via a shared `_lowest_remaining`,
  instead of the last-by-index — fixing a latent inaccuracy in the OGC chunker too.

Behavior-preserving (live-verified); offline OGC/wateruse/utils/progress suites
green; ruff + mypy --strict clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Sjb14HkwuCydKSKMsaXsgd
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant