Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
**05/14/2026:** Fixed two latent bugs in the paginated `waterdata` request loop (`_walk_pages` and `get_stats_data`). Previously, when `requests.Session.request(...)` itself raised mid-pagination (network error, timeout), the except block called `_error_body()` on the *prior page's* response, so the logged "error" described the wrong request and could itself crash on non-JSON bodies. Separately, no status-code check was performed on subsequent paginated responses, so a 5xx body that didn't include `numberReturned` was silently treated as an empty page — pagination quietly stopped and the user got truncated data with no error logged. The loop now status-checks each page like the initial request and reports the actual exception. The "best-effort" behavior (return whatever pages were collected) is preserved.

**05/07/2026:** Bumped the declared minimum Python version from **3.8** to **3.9** (`pyproject.toml`'s `requires-python` and the ruff target). This brings the manifest in line with what was already being tested — CI's matrix has long covered only 3.9, 3.13, and 3.14, the `waterdata` test module already skipped itself on Python < 3.10, and several modules already use 3.9-only stdlib (e.g. `zoneinfo`). Users on 3.8 will no longer be able to install the package; please upgrade.

**05/07/2026:** `waterdata.get_samples()` and `wqp.get_results()` now append a derived `<prefix>DateTime` UTC column for every Date/Time/TimeZone triplet in the response (e.g. `Activity_StartDate` + `Activity_StartTime` + `Activity_StartTimeZone` → `Activity_StartDateTime`). Both the WQX3 (`<X>Date`/`<X>Time`/`<X>TimeZone`) and legacy WQP (`<X>Date`/`<X>Time/Time`/`<X>Time/TimeZoneCode`) shapes are recognized; abbreviations like EST/EDT/CST/PST resolve to a UTC `Timestamp`, unknown codes resolve to `NaT`, and the original triplet columns are preserved. Returned rows are also now sorted by `Activity_StartDateTime` (or the legacy `ActivityStartDateTime`) — the underlying APIs return rows in an unstable order. Mirrors R's `create_dateTime` and end-of-pipeline sort. Closes #266.
Expand Down
35 changes: 24 additions & 11 deletions dataretrieval/waterdata/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,18 @@ def _error_body(resp: requests.Response):
)


def _raise_for_non_200(resp: requests.Response) -> None:
"""Raise ``RuntimeError(_error_body(resp))`` if ``resp`` is not 200.

Routes through ``_error_body`` (USGS-API-aware: handles 429/403
specially, extracts ``code``/``description`` from JSON error bodies)
rather than ``Response.raise_for_status``, which raises
``HTTPError`` with a generic message.
"""
if resp.status_code != 200:
raise RuntimeError(_error_body(resp))


def _construct_api_requests(
service: str,
properties: list[str] | None = None,
Expand Down Expand Up @@ -645,8 +657,7 @@ def _walk_pages(
client = client or requests.Session()
try:
resp = client.send(req)
if resp.status_code != 200:
raise RuntimeError(_error_body(resp))
_raise_for_non_200(resp)

# Store the initial response for metadata
initial_response = resp
Expand All @@ -668,11 +679,11 @@ def _walk_pages(
headers=headers,
data=content if method == "POST" else None,
)
_raise_for_non_200(resp)
dfs.append(_get_resp_data(resp, geopd=geopd))
curr_url = _next_req_url(resp)
except Exception: # noqa: BLE001
error_text = _error_body(resp)
logger.error("Request incomplete. %s", error_text)
except Exception as e: # noqa: BLE001
logger.exception("Request incomplete: %s", e)
logger.warning(
"Request failed for URL: %s. Data download interrupted.", curr_url
Comment on lines +685 to 688
)
Expand Down Expand Up @@ -1105,8 +1116,7 @@ def get_stats_data(

try:
resp = client.send(req)
if resp.status_code != 200:
raise RuntimeError(_error_body(resp))
_raise_for_non_200(resp)

# Store the initial response for metadata
initial_response = resp
Expand All @@ -1132,14 +1142,17 @@ def get_stats_data(
params=args,
headers=headers,
)
_raise_for_non_200(resp)
body = resp.json()
all_dfs.append(_handle_stats_nesting(body, geopd=GEOPANDAS))
next_token = body["next"]
except Exception: # noqa: BLE001
error_text = _error_body(resp)
logger.error("Request incomplete. %s", error_text)
except Exception as e: # noqa: BLE001
logger.exception("Request incomplete: %s", e)
logger.warning(
"Request failed for URL: %s. Data download interrupted.", resp.url
"Request failed for URL: %s (next_token=%s). "
"Data download interrupted.",
url,
next_token,
)
Comment thread
thodson-usgs marked this conversation as resolved.
next_token = None

Expand Down
169 changes: 169 additions & 0 deletions tests/waterdata_utils_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
from unittest import mock

import pandas as pd
import requests

import dataretrieval.waterdata.utils as _utils_module
from dataretrieval.waterdata.utils import (
_arrange_cols,
_error_body,
Expand All @@ -12,6 +14,8 @@
_walk_pages,
)

_LOGGER_NAME = _utils_module.__name__


def test_get_args_basic():
local_vars = {
Expand Down Expand Up @@ -87,6 +91,171 @@ def test_walk_pages_multiple_mocked():
assert mock_client.request.call_args[0][1] == "https://example.com/page2"


def _resp_ok(features):
"""Build a 200-OK mock response carrying the given features list."""
links = [{"rel": "next", "href": "https://example.com/page2"}] if features else []
resp = mock.MagicMock()
resp.json.return_value = {
"numberReturned": len(features),
"features": features,
"links": links,
}
resp.headers = {}
resp.status_code = 200
resp.url = "https://example.com/page1"
return resp


def _error_log_messages(caplog):
"""Pull ERROR-and-above message strings out of caplog. The four
pagination-failure tests below all assert against the same shape."""
Comment on lines +110 to +111
return [r.getMessage() for r in caplog.records if r.levelno >= logging.ERROR]


def _walk_pages_with_failure(failure_resp_or_exc):
"""Run _walk_pages where page 1 succeeds and page 2 fails as given."""
resp1 = _resp_ok([{"id": "1", "properties": {"val": "a"}}])

mock_client = mock.MagicMock(spec=requests.Session)
mock_client.send.return_value = resp1
if isinstance(failure_resp_or_exc, BaseException):
mock_client.request.side_effect = failure_resp_or_exc
else:
mock_client.request.return_value = failure_resp_or_exc

mock_req = mock.MagicMock(spec=requests.PreparedRequest)
mock_req.method = "GET"
mock_req.headers = {}
mock_req.url = "https://example.com/page1"

return _walk_pages(geopd=False, req=mock_req, client=mock_client)


def test_walk_pages_logs_actual_exception_when_request_raises(caplog):
"""Exception from client.request() must be logged with its actual message."""
caplog.set_level(logging.ERROR, logger=_LOGGER_NAME)

df, _ = _walk_pages_with_failure(requests.ConnectionError("boom"))

# First page's data is preserved (best-effort behavior).
assert list(df["val"]) == ["a"]
# Logged error mentions the actual ConnectionError, not a stale page body.
messages = _error_log_messages(caplog)
assert any("boom" in m for m in messages), messages


def test_walk_pages_surfaces_5xx_mid_pagination(caplog):
"""A non-200 mid-pagination response must be logged, not silently swallowed."""
caplog.set_level(logging.ERROR, logger=_LOGGER_NAME)

page2_500 = mock.MagicMock()
page2_500.status_code = 503
page2_500.json.return_value = {
"code": "ServiceUnavailable",
"description": "upstream timeout",
}
page2_500.url = "https://example.com/page2"

df, _ = _walk_pages_with_failure(page2_500)
Comment thread
thodson-usgs marked this conversation as resolved.
Outdated

assert list(df["val"]) == ["a"]
messages = _error_log_messages(caplog)
assert any("503" in m or "ServiceUnavailable" in m for m in messages), messages


def _stats_initial_ok():
"""A 200-OK initial stats response: empty data list, signals one more page."""
resp = mock.MagicMock()
resp.status_code = 200
resp.json.return_value = {
"next": "tok2",
"features": [],
}
resp.headers = {}
resp.url = "https://example.com/stats?service=foo"
return resp


def _run_get_stats_data_with_failure(failure_resp_or_exc, monkeypatch):
"""Exercise get_stats_data where the initial response succeeds and the
paginated follow-up fails as given. Mirrors _walk_pages_with_failure.
`monkeypatch` stubs ``_handle_stats_nesting`` so the synthetic minimal
response body doesn't need to parse — these tests only assert on the
pagination loop's error surfacing."""
from dataretrieval.waterdata.utils import get_stats_data

monkeypatch.setattr(
_utils_module,
"_handle_stats_nesting",
mock.MagicMock(return_value=pd.DataFrame()),
)

mock_client = mock.MagicMock(spec=requests.Session)
mock_client.send.return_value = _stats_initial_ok()
if isinstance(failure_resp_or_exc, BaseException):
mock_client.request.side_effect = failure_resp_or_exc
else:
mock_client.request.return_value = failure_resp_or_exc

return get_stats_data(
args={"monitoring_location_id": "USGS-1"},
service="observationNormals",
expand_percentiles=False,
client=mock_client,
)


def test_get_stats_data_logs_actual_exception_when_request_raises(caplog, monkeypatch):
"""get_stats_data variant of the connection-error scenario."""
caplog.set_level(logging.ERROR, logger=_LOGGER_NAME)

_run_get_stats_data_with_failure(
requests.ConnectionError("stats-boom"),
monkeypatch,
)

messages = _error_log_messages(caplog)
assert any("stats-boom" in m for m in messages), messages


def test_get_stats_data_surfaces_5xx_mid_pagination(caplog, monkeypatch):
"""get_stats_data variant of the mid-pagination 5xx scenario."""
caplog.set_level(logging.ERROR, logger=_LOGGER_NAME)

page2_503 = mock.MagicMock()
page2_503.status_code = 503
page2_503.json.return_value = {
"code": "ServiceUnavailable",
"description": "upstream timeout",
}
page2_503.url = "https://example.com/stats?service=foo&next_token=tok2"

_run_get_stats_data_with_failure(page2_503, monkeypatch)

messages = _error_log_messages(caplog)
assert any("503" in m or "ServiceUnavailable" in m for m in messages), messages


def test_get_stats_data_warning_includes_next_token(caplog, monkeypatch):
"""The pagination-failure warning includes the next_token so operators
can identify which page in the sequence failed. (Addresses Copilot's
PR #273 review note: the base URL alone drops cursor context.)"""
caplog.set_level(logging.WARNING, logger=_LOGGER_NAME)

page2_503 = mock.MagicMock()
page2_503.status_code = 503
page2_503.json.return_value = {
"code": "ServiceUnavailable",
"description": "upstream timeout",
}

_run_get_stats_data_with_failure(page2_503, monkeypatch)

warnings_ = [r.getMessage() for r in caplog.records if r.levelno == logging.WARNING]
# The initial response from _stats_initial_ok carries next=tok2.
assert any("tok2" in m for m in warnings_), warnings_


def test_handle_stats_nesting_tolerates_missing_drop_columns():
"""If the upstream stats response shape ever changes such that one of
the columns we try to drop ("type", "properties.data") is absent, the
Expand Down
Loading