Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerImageTag: 4.0.0
dockerImageTag: 4.1.0
dockerRepository: airbyte/source-mixpanel
documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel
githubIssueLabel: source-mixpanel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "4.0.0"
version = "4.1.0"
name = "source-mixpanel"
description = "Source implementation for Mixpanel."
authors = ["Airbyte <contact@airbyte.io>"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: 0.80.0
version: 0.81.0
type: DeclarativeSource

definitions:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,17 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
else:
credentials["option_title"] = "Service Account"

streams = super().streams(config=config)
selected_streams = config.get("streams")

all_streams = super().streams(config=config)

config_transformed = copy.deepcopy(config)
config_transformed = self._validate_and_transform(config_transformed)
auth = self.get_authenticator(config)

streams.append(Export(authenticator=auth, **config_transformed))
all_streams.append(Export(authenticator=auth, **config_transformed))

if selected_streams:
all_streams = [s for s in all_streams if s.name in selected_streams]
Comment on lines +158 to +159

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Check stream interaction with the new streams filter is CDK-dependent

The manifest defines check.stream_names: [cohorts] at source_mixpanel/manifest.yaml:800-801. If a user sets the streams config to a subset that excludes cohorts, the behavior of check_connection depends on whether the CDK resolves check streams from the manifest definitions directly or from the output of streams(). If the CDK uses streams(), the check would fail because cohorts would be filtered out. In practice, declarative source check resolution typically resolves streams independently from streams(), so this is likely fine, but worth verifying with CDK documentation if this feature is used widely.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 Not fixing. The CDK's CheckStream resolves check streams from the manifest definitions directly, not from the output of streams(). The check_connection call in test_source.py hits the cohorts endpoint (/api/query/cohorts/list) regardless of what streams() returns. So filtering via the streams config won't affect the check — cohorts will always be checked via the manifest-defined check stream.

Worth noting in docs if this feature sees wide adoption, but not a code-level concern.


Devin session

Comment on lines +148 to +159

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 No tests added for the new streams filtering feature

The PR adds a new config parameter and filtering logic but does not include any unit tests for the behavior. The existing test_streams at unit_tests/test_source.py:49 asserts len(streams) == 6 without exercising the filter. Tests for (1) filtering with a subset, (2) empty list returning all streams, and (3) None returning all streams would increase confidence in this feature.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

☑️ Resolved in 0e25744. Added a parametrized test_streams_filtering test covering: no filter (None), empty list, subset, single stream, and all streams explicit.


Devin session


return streams
return all_streams
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,24 @@
"default": 3,
"examples": [1, 2, 3],
"description": "The number of worker threads to use for the sync. The performance upper boundary is based on the limit of your Mixpanel pricing plan. More info about the rate limit tiers can be found on Mixpanel's API <a href=\"https://developer.mixpanel.com/reference/raw-event-e xport#api-export-endpoint-rate-limits\">docs</a>."
},
"streams": {
"order": 12,
"title": "Streams to Discover",
"description": "Select specific streams to discover and sync. If left empty, all streams will be available. Use this to speed up schema discovery when you only need specific data and discovery is timing out.",
"type": "array",
"items": {
"type": "string",
"enum": [
"cohorts",
"engage",
"annotations",
"cohort_members",
"funnels",
"export"
]
},
"uniqueItems": true
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,47 @@ def test_streams(requests_mock, config_raw):
assert "revenue" not in [stream.name for stream in streams]


@pytest.mark.parametrize(
"selected_streams,expected_count,expected_names",
[
pytest.param(None, 6, None, id="no_filter_returns_all"),
pytest.param([], 6, None, id="empty_list_returns_all"),
pytest.param(["cohorts", "annotations"], 2, {"cohorts", "annotations"}, id="subset_filter"),
pytest.param(["export"], 1, {"export"}, id="single_stream_filter"),
pytest.param(
["cohorts", "engage", "annotations", "cohort_members", "funnels", "export"],
6,
{"cohorts", "engage", "annotations", "cohort_members", "funnels", "export"},
id="all_streams_explicit",
),
],
)
def test_streams_filtering(requests_mock, config_raw, selected_streams, expected_count, expected_names):
requests_mock.register_uri("POST", "https://mixpanel.com/api/query/engage?page_size=1000", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/query/engage/properties", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/query/events/properties/top", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/query/annotations", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/query/cohorts/list", setup_response(200, {"id": 123}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/query/funnels", setup_response(200, {}))
requests_mock.register_uri(
"GET", "https://mixpanel.com/api/query/funnels/list", setup_response(200, {"funnel_id": 123, "name": "name"})
)
requests_mock.register_uri(
"GET",
"https://data.mixpanel.com/api/2.0/export",
setup_response(200, {"event": "some event", "properties": {"event": 124, "time": 124124}}),
)

config = copy.deepcopy(config_raw)
if selected_streams is not None:
config["streams"] = selected_streams

streams = SourceMixpanel(MagicMock(), config, MagicMock()).streams(config)
assert len(streams) == expected_count
if expected_names:
assert {s.name for s in streams} == expected_names


def test_streams_string_date(requests_mock, config_raw):
requests_mock.register_uri("GET", "https://mixpanel.com/api/query/engage/properties", setup_response(200, {}))
requests_mock.register_uri("GET", "https://mixpanel.com/api/query/events/properties/top", setup_response(200, {}))
Expand Down
4 changes: 3 additions & 1 deletion docs/integrations/sources/mixpanel.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ The connector also supports [Project Secret](https://developer.mixpanel.com/refe
14. For **Page Size**, enter the number of records to fetch per request for the Engage stream. The default is 1000.
15. For **Export Lookback Window**, enter the number of seconds to look back from the last synced timestamp during incremental syncs of the Export stream. This helps avoid missed data due to delays in event recording. The default is 0 seconds.
16. For **Number of concurrent threads**, enter the number of worker threads for the sync. The default is 3. Higher values may improve performance but are constrained by your Mixpanel plan's [rate limits](https://developer.mixpanel.com/reference/rate-limits).
17. Click **Set up source**.
17. (Optional) For **Streams to Discover**, select the specific streams you want to discover and sync. If left empty, all streams are available. Use this to speed up schema discovery when your account has access to many endpoints and discovery is timing out. Available streams: `cohorts`, `engage`, `annotations`, `cohort_members`, `funnels`, `export`.
18. Click **Set up source**.

## Supported sync modes

Expand Down Expand Up @@ -87,6 +88,7 @@ If you use Airbyte Cloud and your organization restricts access to specific IPs,

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 4.1.0 | 2026-06-30 | [81343](https://github.com/airbytehq/airbyte/pull/81343) | Add optional `streams` config parameter to filter which streams are discovered and synced, improving schema discovery performance for accounts with broad access |
| 4.0.0 | 2026-05-22 | [78271](https://github.com/airbytehq/airbyte/pull/78271) | Removed the Revenue stream because Mixpanel no longer provides a documented or working revenue Query API endpoint. |
| 3.6.3 | 2026-04-13 | [76276](https://github.com/airbytehq/airbyte/pull/76276) | Rename "concurrent workers" to "concurrent threads" in connector spec |
| 3.6.2 | 2026-04-03 | [76039](https://github.com/airbytehq/airbyte/pull/76039) | Replace deprecated MessageRepresentationAirbyteTracedErrors with AirbyteTracedException in tests |
Expand Down
Loading