feat(source-s3): migrate to FileBasedConcurrentCursor + bump CDK to prerelease with state throttle#78325
Conversation
…7349277 Picks up the file-based concurrent cursor state-emission throttle from airbytehq/airbyte-python-cdk#1032, which prevents the replication orchestrator from OOMing on file streams with thousands of files. The prior cursor emitted a state message per processed file containing the full file-history dict (O(N^2) state-message bytes over a sync); under back-pressure the platform's state buffer grew until the replication pod was killed, leaving the sync to fail with TransientErrorException on the destination side. Mirrors the fix pattern previously applied to ConcurrentPerPartitionCursor for oncall #7856; this bump pulls it in for source-s3 (oncall #12663).
|
Note 📝 PR Converted to Draft More info...Thank you for creating this PR. As a policy to protect our engineers' time, Airbyte requires all PRs to be created first in draft status. Your PR has been automatically converted to draft status in respect for this policy. As soon as your PR is ready for formal review, you can proceed to convert the PR to "ready for review" status by clicking the "Ready for review" button at the bottom of the PR page. To skip draft status in future PRs, please include |
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksPR Slash CommandsAirbyte Maintainers (that's you!) can execute the following slash commands on your PR:
📚 Show Repo GuidanceHelpful Resources
|
| @@ -356,6 +356,7 @@ This connector utilizes the open source [Unstructured](https://unstructured-io.g | |||
|
|
|||
There was a problem hiding this comment.
[markdownlint-fix] reported by reviewdog 🐶
|
/publish-connectors-prerelease
|
|
Deploy preview for airbyte-docs ready!
Deployed with vercel-action |
|
|
/publish-connectors-prerelease
|
source-s3 previously used the legacy DefaultFileBasedCursor via the
source-s3-specific Cursor subclass. That cursor goes through the legacy
non-concurrent file-based read path, which emits a state message per
processed file. With ~10 k files in a stream, the platform/orchestrator
buffers ~10 k state messages (each carrying the growing history dict)
waiting for the destination to ACK them; on slow destinations the
orchestrator pod OOMs, the source pod is torn down before emitting
terminal stream status, and the destination fails with
TransientErrorException("Input was fully read, but some streams did not
receive a terminal stream status message").
Move source-s3 onto FileBasedConcurrentCursor (CDK) which:
- emits state at most once per 600s during a sync (state throttle
added in airbytehq/airbyte-python-cdk#1032);
- always force-emits a final state via ensure_at_least_one_state_emitted;
- runs the read on the concurrent file-based path with up to
DEFAULT_CONCURRENCY (100) workers.
Verified locally against a customer connection with ~9 k files: state
messages dropped from 9 200 to 2, runtime from 52:46 to 4:24, stdout
volume from 4.1 GB to 922 MB, sync completes successfully with a
STREAM_STATUS COMPLETE.
The legacy source-s3 Cursor class only existed to host the v3-to-v4
state migration shipped in 2023 (#29028). 2.5 years on, all active
customers have completed the migration on their first v4 sync, so the
subclass and its tests are removed. A hypothetical customer with stale
v3 state would trigger one full re-sync, after which they'd be on v4.
Pinning to airbyte-cdk 7.19.2.post3.dev26244645194, a prerelease that
includes both the state throttle and a fallback-branch dispatch fix
for the file-based source so connectors using FileBasedConcurrentCursor
do not crash on partial-catalog reads / check / discover.
|
/publish-connectors-prerelease
|
… concurrency After the FileBasedConcurrentCursor migration, source-s3 runs with up to DEFAULT_CONCURRENCY (100) worker threads sharing a single boto3 S3 client. botocore's default urllib3 pool size is 10, so 90+ requests at a time hit the "pool is full" branch — connections are continuously created and torn down. Customer reports the sync now hangs while emitting floods of: WARN Connection pool is full, discarding connection: s3.amazonaws.com Pass `max_pool_connections=DEFAULT_CONCURRENCY` to the botocore Config so the pool matches the concurrent reader's worker count. The TODO at stream_reader.py:270 anticipated exactly this. Reported on oncall #12663 after rolling out 4.15.5-preview.8de6aea.
|
/publish-connectors-prerelease
|
The file-based concurrent CDK defaults to 100 workers. Streams with many small records push the source pod past its 2 Gi memory cap because the unbounded message-repository deque buffers worker output faster than stdout can drain. Override `_concurrency_level` to 20. Still ~20x the legacy single- threaded throughput; keeps peak RSS comfortably under 2 Gi locally even on the largest stream we tested. The boto3 pool stays at the CDK default (100), giving 5x headroom over the worker count and silencing the residual "Connection pool is full" warnings. A bounded message-repository deque in the CDK would let us return to 100-way concurrency later; tracked as a separate follow-up.
|
/publish-connectors-prerelease
|
Memray profiling on a controlled comparison run found that the concurrency=20 cap did not reduce peak memory vs concurrency=100; in fact c=100 peaked at 377 MB while c=20 peaked at 1.22 GB on the same stream. Memory churn is dominated by the per-record serialization path (orjson dumps + decode + PrintBuffer flush), which is independent of worker count. Lowering concurrency only costs throughput. Restore _concurrency_level = DEFAULT_CONCURRENCY. The boto3 connection pool size stays at the CDK default concurrency so the pool has headroom over the worker count.
|
/publish-connectors-prerelease
|
…spec
Two improvements to the concurrent file-based read path's memory footprint
under the customer pod's 2 Gi limit:
1. mallopt(M_ARENA_MAX, 2) at process start (source_s3/run.py)
With the default `8 x N_CPUs` arenas, the 100-worker concurrent file read
creates many thread-local glibc malloc arenas. Each can grow to ~64 MB
and is never returned to the OS for the process lifetime — that pinned
overhead alone accounts for ~1 GB on the affected dictionary streams.
The mallopt call is the runtime equivalent of MALLOC_ARENA_MAX=2 but
scoped to this connector only; no Dockerfile or container-env change.
No-op on macOS / musl-libc images (catches OSError on libc.so.6 load).
Measured on the affected dictionary stream (~113 k records):
baseline c=100 : 2.78 GB peak RSS
+ mallopt(M_ARENA_MAX, 2) : 1.79 GB peak RSS (-990 MB)
+ bounded inter-worker queue: 0.85 GB peak RSS (-1.93 GB, stacks)
2. concurrency_level config field (advanced group)
Operational knob for tuning peak memory vs throughput on a per-connection
basis. Leaves the CDK default (100) when unset. Useful as an escape hatch
on connections that still hit memory limits despite the other fixes.
Both leave wall clock and pool-warning counts unchanged.
| # MALLOC_ARENA_MAX environment variable, but applied at process start | ||
| # so it is scoped to this connector only. | ||
| libc.mallopt(-8, 2) | ||
| except (OSError, AttributeError): |
… prerelease Override `_concurrent_record_queue_maxsize = 1_000` (new CDK hook) so the inter-worker queue is capped at 1k items for source-s3 specifically; the CDK global default stays at 10_000. Each queue item is a parsed record; on dictionary-style streams the records can be tens of KB each, so a 10_000-deep queue pinned hundreds of MB of liveset at peak. Bumps airbyte-cdk to 7.19.2.post4.dev26418171295 which contains: - state-emission throttle in FileBasedConcurrentCursor - cursor-cls dispatch fix in FileBasedSource fallback branch - `_concurrent_record_queue_maxsize` override hook this commit consumes Measured combined effect on the affected dictionary stream: c=100 baseline : 2.78 GB peak RSS + mallopt(M_ARENA_MAX, 2) : 1.79 GB + bounded record queue (this commit) : 0.85 GB Wall clock and STREAM_STATUS COMPLETE behaviour unchanged.
|
/publish-connectors-prerelease
|
|
↪️ Triggering Reason: Draft source-s3 migration PR now has a successful prerelease publish and connector tests; remaining Format Check failure can be handled during validation, so prove-fix is the next Hydra stage. |
|
↪️ Intended to trigger Reason: Draft source-s3 migration PR now has a successful prerelease publish and connector tests; remaining Format Check failure can be handled during validation, so prove-fix is the next Hydra stage. Operational note: GitHub workflow_dispatch returned HTTP 500/403 with the configured trigger token; repository_dispatch does not start this workflow because the workflow does not subscribe to repository_dispatch. This has been reported as an Ops MCP/workflow blocker. |
|
↪️ Triggered Reason: Earlier sweep intended prove-fix for source-s3 concurrent cursor migration but dispatch failed; retrying via workflow_dispatch now. |
|
|
Outcome: Fix proven successfully I validated this PR with the mandatory source regression workflow and live Cloud evidence from the affected large-file workload. Evidence summary
Next steps
Connector details
Evidence planProving criteria
Disproving criteria
Pre-flight checks
Detailed evidence log
|
Summary
FileBasedConcurrentCursor(concurrent file-based read path).airbyte-cdkto prerelease7.19.2.post3.dev26244645194(from fix(file-based): throttle state-message emission to prevent platform OOM on large file streams airbyte-python-cdk#1032) which adds:FileBasedSource.streams()so concurrent cursors don't crash on partial-catalog / check / discover paths.source_s3.v4.cursor.Cursorand its tests. That subclass only existed for v3→v4 state migration (2023), which all active customers completed long ago. A hypothetical stale-v3 connection would trigger one full re-sync, after which it would be on v4 state.Why
source_s3.v4.cursor.CursorextendsDefaultFileBasedCursor, so source-s3 took the legacy non-concurrent file-based read path. That path emits a state message per processed file, with the full file-history dict embedded in each one. On a sync of N files the platform/orchestrator buffers N states (each growing in size with N) waiting for the destination to ACK them. Slow destinations cause the orchestrator pod to OOM; the source pod is torn down before emitting terminal stream status; the destination fails with:Mirror of the precedent in oncall #7856 (Jira / declarative concurrent cursor); fixes oncall #12663.
Local verification on a ~9 k-file stream
STREAM_STATUS COMPLETETest plan
unit_tests/v4— 75 passed locally.airbyte-ci connectors --name=source-s3 test --unitairbyte-ci connectors --name=source-s3 test --acceptanceNotes