feat(source-mixpanel): add optional stream filtering to speed up schema discovery#81343
feat(source-mixpanel): add optional stream filtering to speed up schema discovery#81343Ryan Waskewich (rwask) wants to merge 3 commits into
Conversation
…ma discovery Co-Authored-By: Ryan Waskewich <ryan.waskewich@airbyte.io>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
Co-Authored-By: Ryan Waskewich <ryan.waskewich@airbyte.io>
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksPR Slash CommandsAirbyte Maintainers (that's you!) can execute the following slash commands on your PR:
📚 Show Repo GuidanceHelpful Resources
|
|
| if selected_streams: | ||
| all_streams = [s for s in all_streams if s.name in selected_streams] |
There was a problem hiding this comment.
🚩 Check stream interaction with the new streams filter is CDK-dependent
The manifest defines check.stream_names: [cohorts] at source_mixpanel/manifest.yaml:800-801. If a user sets the streams config to a subset that excludes cohorts, the behavior of check_connection depends on whether the CDK resolves check streams from the manifest definitions directly or from the output of streams(). If the CDK uses streams(), the check would fail because cohorts would be filtered out. In practice, declarative source check resolution typically resolves streams independently from streams(), so this is likely fine, but worth verifying with CDK documentation if this feature is used widely.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
🚫 Not fixing. The CDK's CheckStream resolves check streams from the manifest definitions directly, not from the output of streams(). The check_connection call in test_source.py hits the cohorts endpoint (/api/query/cohorts/list) regardless of what streams() returns. So filtering via the streams config won't affect the check — cohorts will always be checked via the manifest-defined check stream.
Worth noting in docs if this feature sees wide adoption, but not a code-level concern.
| selected_streams = config.get("streams") | ||
|
|
||
| all_streams = super().streams(config=config) | ||
|
|
||
| config_transformed = copy.deepcopy(config) | ||
| config_transformed = self._validate_and_transform(config_transformed) | ||
| auth = self.get_authenticator(config) | ||
|
|
||
| streams.append(Export(authenticator=auth, **config_transformed)) | ||
| all_streams.append(Export(authenticator=auth, **config_transformed)) | ||
|
|
||
| if selected_streams: | ||
| all_streams = [s for s in all_streams if s.name in selected_streams] |
There was a problem hiding this comment.
🚩 No tests added for the new streams filtering feature
The PR adds a new config parameter and filtering logic but does not include any unit tests for the behavior. The existing test_streams at unit_tests/test_source.py:49 asserts len(streams) == 6 without exercising the filter. Tests for (1) filtering with a subset, (2) empty list returning all streams, and (3) None returning all streams would increase confidence in this feature.
Was this helpful? React with 👍 or 👎 to provide feedback.
There was a problem hiding this comment.
☑️ Resolved in 0e25744. Added a parametrized test_streams_filtering test covering: no filter (None), empty list, subset, single stream, and all streams explicit.
Co-Authored-By: Ryan Waskewich <ryan.waskewich@airbyte.io>
|
Deploy preview for airbyte-docs ready!
Deployed with vercel-action |
What
When a Mixpanel service account has broad access, schema discovery can time out because the connector discovers schemas for all 6 streams — including
engageandexportwhich use dynamic schema loaders that make live API calls (engage/propertiesandevents/properties/top). These calls are rate-limited and can return massive responses with broad-access accounts.This adds an optional
streamsconfig parameter so users can restrict which streams are discovered and synced, avoiding the expensive dynamic schema discovery for streams they don't need.Tracked in https://github.com/airbytehq/oncall/issues/13020. Requested by Ryan Waskewich (@rwask).
How
spec.json— Added optionalstreamsfield (array of enum stream names:cohorts,engage,annotations,cohort_members,funnels,export)source.py— Filter returned streams instreams()method whenstreamsconfig is populated:test_streams_filteringcovering: no filter, empty list, subset, single stream, and all-streams-explicit casesReview guide
source_mixpanel/spec.json— new optionalstreamsconfig fieldsource_mixpanel/source.py— stream filtering logic instreams()methodunit_tests/test_source.py— parametrized tests for stream filteringmetadata.yaml/pyproject.toml— version bumpdocs/integrations/sources/mixpanel.md— setup instructions and changelogUser Impact
Users with Mixpanel service accounts that have broad access can now optionally select specific streams to discover, preventing schema discovery timeouts. When the field is left empty (default), all streams are discovered as before — fully backward compatible.
Can this PR be safely reverted and rolled back?
Link to Devin session: https://app.devin.ai/sessions/c078325c99ee46068dd1d015435c66c7