diff --git a/airbyte-integrations/connectors/source-mixpanel/metadata.yaml b/airbyte-integrations/connectors/source-mixpanel/metadata.yaml index c03ff8b69bf4..49bac8760d3f 100644 --- a/airbyte-integrations/connectors/source-mixpanel/metadata.yaml +++ b/airbyte-integrations/connectors/source-mixpanel/metadata.yaml @@ -11,7 +11,7 @@ data: connectorSubtype: api connectorType: source definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a - dockerImageTag: 4.0.0 + dockerImageTag: 4.1.0 dockerRepository: airbyte/source-mixpanel documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel githubIssueLabel: source-mixpanel diff --git a/airbyte-integrations/connectors/source-mixpanel/pyproject.toml b/airbyte-integrations/connectors/source-mixpanel/pyproject.toml index e101dd4d6542..f54e3b146264 100644 --- a/airbyte-integrations/connectors/source-mixpanel/pyproject.toml +++ b/airbyte-integrations/connectors/source-mixpanel/pyproject.toml @@ -3,7 +3,7 @@ requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "4.0.0" +version = "4.1.0" name = "source-mixpanel" description = "Source implementation for Mixpanel." authors = ["Airbyte "] diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/manifest.yaml b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/manifest.yaml index ae5c560ebbe5..4081ccb01d39 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/manifest.yaml +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/manifest.yaml @@ -1,4 +1,4 @@ -version: 0.80.0 +version: 0.81.0 type: DeclarativeSource definitions: diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py index b3fcfb6291a4..ec0420d0bb85 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/source.py @@ -145,12 +145,17 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: else: credentials["option_title"] = "Service Account" - streams = super().streams(config=config) + selected_streams = config.get("streams") + + all_streams = super().streams(config=config) config_transformed = copy.deepcopy(config) config_transformed = self._validate_and_transform(config_transformed) auth = self.get_authenticator(config) - streams.append(Export(authenticator=auth, **config_transformed)) + all_streams.append(Export(authenticator=auth, **config_transformed)) + + if selected_streams: + all_streams = [s for s in all_streams if s.name in selected_streams] - return streams + return all_streams diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json index ee97548a8e7c..aad3e5f1f01b 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/spec.json @@ -145,6 +145,24 @@ "default": 3, "examples": [1, 2, 3], "description": "The number of worker threads to use for the sync. The performance upper boundary is based on the limit of your Mixpanel pricing plan. More info about the rate limit tiers can be found on Mixpanel's API docs." + }, + "streams": { + "order": 12, + "title": "Streams to Discover", + "description": "Select specific streams to discover and sync. If left empty, all streams will be available. Use this to speed up schema discovery when you only need specific data and discovery is timing out.", + "type": "array", + "items": { + "type": "string", + "enum": [ + "cohorts", + "engage", + "annotations", + "cohort_members", + "funnels", + "export" + ] + }, + "uniqueItems": true } } } diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py index 7ee5398ca114..5577d16d204e 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_source.py @@ -68,6 +68,47 @@ def test_streams(requests_mock, config_raw): assert "revenue" not in [stream.name for stream in streams] +@pytest.mark.parametrize( + "selected_streams,expected_count,expected_names", + [ + pytest.param(None, 6, None, id="no_filter_returns_all"), + pytest.param([], 6, None, id="empty_list_returns_all"), + pytest.param(["cohorts", "annotations"], 2, {"cohorts", "annotations"}, id="subset_filter"), + pytest.param(["export"], 1, {"export"}, id="single_stream_filter"), + pytest.param( + ["cohorts", "engage", "annotations", "cohort_members", "funnels", "export"], + 6, + {"cohorts", "engage", "annotations", "cohort_members", "funnels", "export"}, + id="all_streams_explicit", + ), + ], +) +def test_streams_filtering(requests_mock, config_raw, selected_streams, expected_count, expected_names): + requests_mock.register_uri("POST", "https://mixpanel.com/api/query/engage?page_size=1000", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/query/engage/properties", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/query/events/properties/top", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/query/annotations", setup_response(200, {})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/query/cohorts/list", setup_response(200, {"id": 123})) + requests_mock.register_uri("GET", "https://mixpanel.com/api/query/funnels", setup_response(200, {})) + requests_mock.register_uri( + "GET", "https://mixpanel.com/api/query/funnels/list", setup_response(200, {"funnel_id": 123, "name": "name"}) + ) + requests_mock.register_uri( + "GET", + "https://data.mixpanel.com/api/2.0/export", + setup_response(200, {"event": "some event", "properties": {"event": 124, "time": 124124}}), + ) + + config = copy.deepcopy(config_raw) + if selected_streams is not None: + config["streams"] = selected_streams + + streams = SourceMixpanel(MagicMock(), config, MagicMock()).streams(config) + assert len(streams) == expected_count + if expected_names: + assert {s.name for s in streams} == expected_names + + def test_streams_string_date(requests_mock, config_raw): requests_mock.register_uri("GET", "https://mixpanel.com/api/query/engage/properties", setup_response(200, {})) requests_mock.register_uri("GET", "https://mixpanel.com/api/query/events/properties/top", setup_response(200, {})) diff --git a/docs/integrations/sources/mixpanel.md b/docs/integrations/sources/mixpanel.md index 6f4eedb48af1..45397a76b445 100644 --- a/docs/integrations/sources/mixpanel.md +++ b/docs/integrations/sources/mixpanel.md @@ -33,7 +33,8 @@ The connector also supports [Project Secret](https://developer.mixpanel.com/refe 14. For **Page Size**, enter the number of records to fetch per request for the Engage stream. The default is 1000. 15. For **Export Lookback Window**, enter the number of seconds to look back from the last synced timestamp during incremental syncs of the Export stream. This helps avoid missed data due to delays in event recording. The default is 0 seconds. 16. For **Number of concurrent threads**, enter the number of worker threads for the sync. The default is 3. Higher values may improve performance but are constrained by your Mixpanel plan's [rate limits](https://developer.mixpanel.com/reference/rate-limits). -17. Click **Set up source**. +17. (Optional) For **Streams to Discover**, select the specific streams you want to discover and sync. If left empty, all streams are available. Use this to speed up schema discovery when your account has access to many endpoints and discovery is timing out. Available streams: `cohorts`, `engage`, `annotations`, `cohort_members`, `funnels`, `export`. +18. Click **Set up source**. ## Supported sync modes @@ -87,6 +88,7 @@ If you use Airbyte Cloud and your organization restricts access to specific IPs, | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 4.1.0 | 2026-06-30 | [81343](https://github.com/airbytehq/airbyte/pull/81343) | Add optional `streams` config parameter to filter which streams are discovered and synced, improving schema discovery performance for accounts with broad access | | 4.0.0 | 2026-05-22 | [78271](https://github.com/airbytehq/airbyte/pull/78271) | Removed the Revenue stream because Mixpanel no longer provides a documented or working revenue Query API endpoint. | | 3.6.3 | 2026-04-13 | [76276](https://github.com/airbytehq/airbyte/pull/76276) | Rename "concurrent workers" to "concurrent threads" in connector spec | | 3.6.2 | 2026-04-03 | [76039](https://github.com/airbytehq/airbyte/pull/76039) | Replace deprecated MessageRepresentationAirbyteTracedErrors with AirbyteTracedException in tests |