Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0afa024
feat(mcp): add opt-in per-stream sync progress tracking to get_cloud_…
devin-ai-integration[bot] Apr 3, 2026
4b1b849
docs: fix docstring inaccuracies in _sync_progress module
devin-ai-integration[bot] Apr 3, 2026
6779c79
docs: fix module-level docstring to match wall-clock time implementation
devin-ai-integration[bot] Apr 3, 2026
4a0913d
fix: address CodeRabbit review - Python 3.10 Z suffix, nested cursor …
devin-ai-integration[bot] Apr 3, 2026
1896c2f
refactor: narrow exception catch to specific types (ValidationError, …
devin-ai-integration[bot] Apr 3, 2026
869c803
fix: correct sync progress formula and enrich output with raw factors
devin-ai-integration[bot] Apr 3, 2026
f3944a8
feat: fetch previous sync state for progress baseline
devin-ai-integration[bot] Apr 3, 2026
c310209
fix: use enum comparison and continue scanning in get_previous_sync_s…
devin-ai-integration[bot] Apr 3, 2026
f913d01
feat: add with_rich_status_updates to wait_for_completion and run_sync
devin-ai-integration[bot] Apr 4, 2026
0843782
docs: add example script for cloud sync with Rich progress tracking
devin-ai-integration[bot] Apr 4, 2026
fe4551d
fix: cache pre-sync state and add tiered fallback for progress baseline
devin-ai-integration[bot] Apr 4, 2026
68ac2d6
fix: handle historical backfill in progress computation
devin-ai-integration[bot] Apr 4, 2026
afd150c
refactor: address PR feedback - CDK parser, remove heuristics, guard …
devin-ai-integration[bot] Apr 4, 2026
bfed2b9
fix: resolve catalog lookup for syncCatalog nesting in progress compu…
devin-ai-integration[bot] Apr 4, 2026
764b2d4
fix: override progress to 100% when job status is SUCCEEDED
devin-ai-integration[bot] Apr 4, 2026
032710a
feat: add start time, end time, and elapsed to Rich table caption
devin-ai-integration[bot] Apr 4, 2026
1d2ade4
feat: use catalog stream count as denominator for progress reporting
devin-ai-integration[bot] Apr 4, 2026
d9bbcdb
fix: filter catalog streams by selected=True for accurate denominator
devin-ai-integration[bot] Apr 4, 2026
4ecc64a
feat: enhance Rich table with records/bytes throughput, remove cursor…
devin-ai-integration[bot] Apr 4, 2026
7171e2e
fix: fix secret resolution and debug prints
aaronsteers Apr 6, 2026
d79ed04
feat: add JSONL progress audit logging to Rich polling loop
devin-ai-integration[bot] Apr 23, 2026
343b2d7
feat: add records/bytes proof-of-life progress from Config API stream…
devin-ai-integration[bot] Apr 23, 2026
7240d6f
feat: observe source/destination STATE messages for local sync progress
devin-ai-integration[bot] Apr 23, 2026
3e3c3df
fix: resolve pyrefly type errors for dict.get on stream_stats
devin-ai-integration[bot] Apr 23, 2026
3efd01e
chore: remove test artifact from repo
devin-ai-integration[bot] Apr 23, 2026
4aab949
feat: force stream progress to 100% on STREAM_COMPLETE and sync success
devin-ai-integration[bot] Apr 23, 2026
b972bc8
feat: extract cursor from nested per-partition state (source-github-s…
devin-ai-integration[bot] Apr 23, 2026
d73ec47
feat(progress): consolidate per-stream status into a single line
devin-ai-integration[bot] Apr 23, 2026
415b8f8
fix(cloud): isolate JSONL log write in a module-level helper
devin-ai-integration[bot] Apr 24, 2026
a1098c9
feat(progress): render cursor dates as concise yyyy-mm-dd
devin-ai-integration[bot] Apr 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 224 additions & 0 deletions airbyte/cloud/_sync_progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Sync progress estimation for datetime-cursor-based incremental streams.

This module provides functions to estimate per-stream sync progress using
a wall-clock time heuristic:

```
progress = (cursor_dt - sync_start_time) / (now - sync_start_time)
```

Where:
- `cursor_dt` is the latest committed cursor value parsed as a datetime.
- `sync_start_time` is the wall-clock time when the sync job started.
- `now` is the current UTC time.

This heuristic works well for real-time incremental syncs where cursor
timestamps advance roughly with wall-clock time. It may be inaccurate
for historical backfills where the cursor covers a different time range
than the actual sync duration (progress will clamp to 0% in that case).

Only streams with datetime-based cursors are supported. Non-datetime
cursors (integers, opaque tokens, etc.) are skipped.
"""

from __future__ import annotations

import logging
from datetime import datetime, timezone
from typing import Any

from airbyte.cloud._connection_state import (
ConnectionStateResponse,
StreamState,
_get_stream_list,
)


logger = logging.getLogger(__name__)


def _try_parse_datetime_cursor(value: str) -> datetime | None:

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CDK already has something that should do this. Something like ab_datetime_parse

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit afd150c. Now delegates to the CDK's ab_datetime_try_parse() from airbyte_cdk.utils.datetime_helpers.

Kept the numeric pre-filter because ab_datetime_try_parse('12345') interprets pure numbers as epoch timestamps (returns 1970-01-01T03:25:45+00:00), but cursor values like "12345" are opaque tokens we should skip.


Devin session

"""Attempt to parse a string as a datetime via ISO 8601.

Returns `None` if the value is numeric or cannot be parsed.
"""
# Fast rejection: if it looks like a pure integer or float, skip it
stripped = value.strip()
if not stripped:
return None

try:
float(stripped)
except ValueError:
pass
else:
return None # It's a number, not a datetime

# Try ISO 8601 parsing (handles most Airbyte cursor formats)
try:
dt = datetime.fromisoformat(stripped)
except (ValueError, TypeError):
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
pass
else:
# Ensure timezone-aware (assume UTC if naive)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt

return None


def _extract_cursor_field_from_catalog(
catalog: dict[str, Any],
stream_name: str,
stream_namespace: str | None,
) -> str | None:
"""Extract the cursor field name for a stream from the configured catalog.

Returns the cursor field name, or `None` if the stream is not found
or does not have a cursor field configured.
"""
streams = catalog.get("streams", [])
for stream_entry in streams:
config = stream_entry.get("config", {})
stream_info = stream_entry.get("stream", {})

entry_name = stream_info.get("name") or config.get("aliasName")
entry_namespace = stream_info.get("namespace")

# Normalize namespace comparison (None and "" are equivalent)
if entry_name != stream_name:
continue
if (entry_namespace or None) != (stream_namespace or None):
continue

# cursor_field is a list of field path segments (usually single element)
cursor_field = config.get("cursorField")
if cursor_field and isinstance(cursor_field, list) and len(cursor_field) > 0:
return str(cursor_field[0])
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

return None


def _find_cursor_value_in_state(
stream_state: dict[str, Any] | None,
cursor_field: str | None,
) -> str | None:
"""Find a cursor value in a stream state blob.

If `cursor_field` is provided, looks for it directly. Otherwise,
attempts common cursor field names and heuristics.

Returns the cursor value as a string, or `None` if not found.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach appears to be throwing spaghetti at the wall. Unless we know each of these is a valid algorithm "in the wild" for a real source, we should probably remove our simplify.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — removed all the heuristic fallbacks in commit afd150c. The function now requires an explicit cursor_field from the configured catalog. If cursor_field is None (not in the catalog), it returns None immediately — no more guessing with common_cursor_names or single-datetime-value heuristics.


Devin session

"""
if not stream_state:
return None

# If we know the cursor field, look for it directly
if cursor_field and cursor_field in stream_state:
val = stream_state[cursor_field]
if val is not None:
return str(val)

# Try common cursor field names as fallback
common_cursor_names = ["cursor", "updated_at", "date", "timestamp"]
for name in common_cursor_names:
if name in stream_state:
val = stream_state[name]
if val is not None:
return str(val)

# Last resort: check for a single string value that parses as datetime
string_values = [str(v) for v in stream_state.values() if v is not None and isinstance(v, str)]
datetime_values = [(v, _try_parse_datetime_cursor(v)) for v in string_values]
parsed = [(v, dt) for v, dt in datetime_values if dt is not None]
if len(parsed) == 1:
return parsed[0][0]

return None


def compute_stream_progress(
state_data: dict[str, Any],
catalog_data: dict[str, Any] | None,
sync_start_time: datetime,
now: datetime | None = None,
) -> list[dict[str, Any]]:
"""Compute per-stream sync progress for datetime-cursor-based streams.

Uses a wall-clock time heuristic:
`progress = (cursor_dt - sync_start_time) / (now - sync_start_time)`.
This assumes cursor timestamps advance roughly with wall-clock time,
which works well for real-time incremental syncs but may be inaccurate
for historical backfills where the cursor covers a different time range
than the actual sync duration.

Progress is `None` for streams without datetime cursors or without
state. Values are clamped to [0.0, 1.0].
"""
if now is None:
now = datetime.now(timezone.utc)

# Ensure times are timezone-aware
if sync_start_time.tzinfo is None:
sync_start_time = sync_start_time.replace(tzinfo=timezone.utc)
if now.tzinfo is None:
now = now.replace(tzinfo=timezone.utc)

state = ConnectionStateResponse(**state_data)
streams: list[StreamState] = _get_stream_list(state)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

results: list[dict[str, Any]] = []
for stream in streams:
stream_name = stream.stream_descriptor.name
stream_namespace = stream.stream_descriptor.namespace

# Look up cursor field from catalog
cursor_field: str | None = None
if catalog_data:
cursor_field = _extract_cursor_field_from_catalog(
catalog_data, stream_name, stream_namespace
)

# Find cursor value in state
cursor_value_str = _find_cursor_value_in_state(stream.stream_state, cursor_field)

entry: dict[str, Any] = {
"stream_name": stream_name,
"stream_namespace": stream_namespace,
"cursor_field": cursor_field,
"cursor_value": cursor_value_str,
"progress_pct": None,
"reason": None,
}

if cursor_value_str is None:
entry["reason"] = "No cursor value found in state."
results.append(entry)
continue

cursor_dt = _try_parse_datetime_cursor(cursor_value_str)
if cursor_dt is None:
entry["reason"] = (
f"Cursor value '{cursor_value_str}' is not a recognized datetime format."
)
results.append(entry)
continue

# Calculate progress: (cursor - sync_start) / (now - sync_start)
denominator = (now - sync_start_time).total_seconds()
if denominator <= 0:
entry["reason"] = "Sync start time is not before current time."
results.append(entry)
continue

numerator = (cursor_dt - sync_start_time).total_seconds()

# Clamp to [0.0, 1.0]
progress = max(0.0, min(1.0, numerator / denominator))
entry["progress_pct"] = round(progress, 4)

results.append(entry)

return results
23 changes: 23 additions & 0 deletions airbyte/mcp/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from airbyte import cloud, get_destination, get_source
from airbyte._util import api_util
from airbyte.cloud._sync_progress import compute_stream_progress
from airbyte.cloud.connectors import CustomCloudSourceDefinition
from airbyte.cloud.constants import FAILED_STATUSES
from airbyte.cloud.workspaces import CloudOrganization, CloudWorkspace
Expand Down Expand Up @@ -647,6 +648,19 @@ def get_cloud_sync_status(
default=False,
),
],
with_stream_progress: Annotated[
bool,
Field(
description=(
"Whether to include per-stream sync progress estimates. "
"When enabled, fetches current connection state and catalog to "
"compute an estimated progress percentage for each stream with "
"a datetime-based cursor. This adds latency from additional API "
"calls. Progress is approximate and advances at checkpoint intervals."
),
default=False,
),
],
) -> dict[str, Any]:
"""Get the status of a sync job from the Airbyte Cloud."""
workspace: CloudWorkspace = _get_cloud_workspace(ctx, workspace_id)
Expand Down Expand Up @@ -682,6 +696,15 @@ def get_cloud_sync_status(
for attempt in attempts
]

if with_stream_progress:
state_data = connection.dump_raw_state()
catalog_data = connection.dump_raw_catalog()
result["stream_progress"] = compute_stream_progress(
state_data=state_data,
catalog_data=catalog_data,
sync_start_time=sync_result.start_time,
)

return result


Expand Down
Loading