diff --git a/airbyte-integrations/connectors/source-shopify/metadata.yaml b/airbyte-integrations/connectors/source-shopify/metadata.yaml index 09e723e82e44..c3103cc3d8af 100644 --- a/airbyte-integrations/connectors/source-shopify/metadata.yaml +++ b/airbyte-integrations/connectors/source-shopify/metadata.yaml @@ -11,7 +11,7 @@ data: connectorSubtype: api connectorType: source definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9 - dockerImageTag: 3.5.1 + dockerImageTag: 3.5.2 dockerRepository: airbyte/source-shopify documentationUrl: https://docs.airbyte.com/integrations/sources/shopify erdUrl: https://dbdocs.io/airbyteio/source-shopify?view=relationships diff --git a/airbyte-integrations/connectors/source-shopify/pyproject.toml b/airbyte-integrations/connectors/source-shopify/pyproject.toml index 11b6c7dc89c8..3a1a3780e9d0 100644 --- a/airbyte-integrations/connectors/source-shopify/pyproject.toml +++ b/airbyte-integrations/connectors/source-shopify/pyproject.toml @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",] build-backend = "poetry.core.masonry.api" [tool.poetry] -version = "3.5.1" +version = "3.5.2" name = "source-shopify" description = "Source CDK implementation for Shopify." authors = [ "Airbyte ",] diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/exceptions.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/exceptions.py index ed00906a9374..aec7b81af4a8 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/exceptions.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/exceptions.py @@ -13,8 +13,8 @@ class BaseBulkException(AirbyteTracedException): failure_type: FailureType = FailureType.config_error - def __init__(self, message: str, **kwargs) -> None: - super().__init__(internal_message=message, failure_type=self.failure_type, **kwargs) + def __init__(self, internal_message: str, message: str | None = None, **kwargs) -> None: + super().__init__(message=message, internal_message=internal_message, failure_type=self.failure_type, **kwargs) class BulkJobError(BaseBulkException): """Raised when there are BULK Job Errors in response""" @@ -66,3 +66,8 @@ class BulkJobConcurrentError(BaseBulkException): """Raised when failing the job after hitting too many BulkJobCreationFailedConcurrentError.""" failure_type: FailureType = FailureType.transient_error + + class BulkJobAuthFailedError(BaseBulkException): + """Raised when bulk job creation fails due to an invalid or expired access token.""" + + failure_type: FailureType = FailureType.config_error diff --git a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py index 158a693cd1f4..266b5fb85419 100644 --- a/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py +++ b/airbyte-integrations/connectors/source-shopify/source_shopify/shopify_graphql/bulk/job.py @@ -6,7 +6,7 @@ from datetime import datetime from enum import Enum from time import sleep, time -from typing import Any, Final, Iterable, List, Mapping, Optional +from typing import Any, Final, Iterable, List, Mapping, Optional, Tuple import pendulum as pdm import requests @@ -46,6 +46,9 @@ class ShopifyBulkManager: parent_stream_name: Optional[str] = None parent_stream_cursor: Optional[str] = None + # keywords (lowercased) that indicate an authentication/credential error from Shopify + _AUTH_ERROR_KEYWORDS: Final[Tuple[str, ...]] = ("invalid api key or access token",) + # 10Mb chunk size to save the file _retrieve_chunk_size: Final[int] = 1024 * 1024 * 10 _job_max_retries: Final[int] = 6 @@ -386,7 +389,24 @@ def _on_access_denied_job(self, **kwagrs) -> AirbyteTracedException: def _on_job_with_errors(self, errors: List[Mapping[str, Any]]) -> AirbyteTracedException: raise ShopifyBulkExceptions.BulkJobError(f"Could not validate the status of the BULK Job `{self._job_id}`. Errors: {errors}.") + def _is_auth_error(self, errors: List[Mapping[str, Any]]) -> bool: + for error in errors: + if isinstance(error, str): + error_text = error + elif isinstance(error, dict): + error_text = error.get("message", "") + else: + continue + if any(keyword in error_text.lower() for keyword in self._AUTH_ERROR_KEYWORDS): + return True + return False + def _on_non_handable_job_error(self, errors: List[Mapping[str, Any]]) -> AirbyteTracedException: + if self._is_auth_error(errors): + raise ShopifyBulkExceptions.BulkJobAuthFailedError( + internal_message=f"The Stream: `{self.http_client.name}`, auth error: {errors}", + message="Shopify access token is invalid or has expired.", + ) raise ShopifyBulkExceptions.BulkJobNonHandableError(f"The Stream: `{self.http_client.name}`, Non-handable error occured: {errors}") def _get_server_errors(self, response: requests.Response) -> List[Optional[Mapping[str, Any]]]: diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py b/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py index 3800ea82d356..d23fea5adfea 100644 --- a/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-shopify/unit_tests/conftest.py @@ -227,6 +227,56 @@ def bulk_successful_response_with_errors(): } +@pytest.fixture +def bulk_response_with_auth_error(): + return { + "data": { + "bulkOperationRunQuery": { + "userErrors": [], + }, + }, + "errors": "[API] Invalid API key or access token (unrecognized login or wrong password)", + "extensions": { + "cost": { + "requestedQueryCost": 10, + "actualQueryCost": 10, + "throttleStatus": { + "maximumAvailable": 1000.0, + "currentlyAvailable": 990, + "restoreRate": 50.0, + }, + } + }, + } + + +@pytest.fixture +def bulk_response_with_auth_error_in_user_errors(): + return { + "data": { + "bulkOperationRunQuery": { + "userErrors": [ + { + "message": "Invalid API key or access token", + "code": "INVALID", + }, + ], + }, + }, + "extensions": { + "cost": { + "requestedQueryCost": 10, + "actualQueryCost": 10, + "throttleStatus": { + "maximumAvailable": 1000.0, + "currentlyAvailable": 990, + "restoreRate": 50.0, + }, + } + }, + } + + @pytest.fixture def bulk_successful_response_with_no_id(): return { diff --git a/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_job.py b/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_job.py index 73093badb693..ff5483f1f12e 100644 --- a/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_job.py +++ b/airbyte-integrations/connectors/source-shopify/unit_tests/graphql_bulk/test_job.py @@ -27,7 +27,7 @@ TransactionsGraphql, ) -from airbyte_cdk.models import SyncMode +from airbyte_cdk.models import FailureType, SyncMode _ANY_SLICE = {} @@ -289,6 +289,48 @@ def test_retry_on_job_creation_exception( assert expected_msg in repr(error.value) and requests_mock.call_count == call_count_expected +@pytest.mark.parametrize( + "job_response, expected_error_type, expected_failure_type, expected_msg", + [ + pytest.param( + "bulk_response_with_auth_error", + ShopifyBulkExceptions.BulkJobAuthFailedError, + FailureType.config_error, + "Shopify access token is invalid or has expired.", + id="auth_error_from_server_errors_string", + ), + pytest.param( + "bulk_response_with_auth_error_in_user_errors", + ShopifyBulkExceptions.BulkJobAuthFailedError, + FailureType.config_error, + "Shopify access token is invalid or has expired.", + id="auth_error_from_user_errors_dict", + ), + pytest.param( + "bulk_successful_response_with_errors", + ShopifyBulkExceptions.BulkJobNonHandableError, + FailureType.system_error, + "Non-handable error occured", + id="non_auth_error_stays_system_error", + ), + ], +) +def test_auth_error_classified_as_config_error( + request, requests_mock, auth_config, job_response, expected_error_type, expected_failure_type, expected_msg +) -> None: + stream = MetafieldOrders(auth_config) + stream.job_manager._job_backoff_time = 0 + stream.job_manager._job_max_retries = 0 + + requests_mock.post(stream.job_manager.base_url, json=request.getfixturevalue(job_response)) + + with pytest.raises(expected_error_type) as error: + stream.job_manager.create_job(_ANY_SLICE, _ANY_FILTER_FIELD) + + assert expected_msg in str(error.value) + assert error.value.failure_type == expected_failure_type + + @pytest.mark.parametrize( "job_response, expected", [ diff --git a/docs/integrations/sources/shopify.md b/docs/integrations/sources/shopify.md index 6395db4408d4..38043ce80a6a 100644 --- a/docs/integrations/sources/shopify.md +++ b/docs/integrations/sources/shopify.md @@ -277,6 +277,7 @@ If you synced one of these streams on an earlier connector version and suspect m | Version | Date | Pull Request | Subject | |:-----------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.5.2 | 2026-07-01 | [81363](https://github.com/airbytehq/airbyte/pull/81363) | Classify Shopify authentication errors as config errors instead of system errors during bulk job creation | | 3.5.1 | 2026-06-24 | [80787](https://github.com/airbytehq/airbyte/pull/80787) | Validate and normalize the `shop` config value: accept a bare subdomain or full myshopify URL, and reject malformed input with a clear config error. The normalized handle is also written to the `shop_url` record field, so previously-malformed configs now emit a clean value. | | 3.5.0 | 2026-06-11 | [79624](https://github.com/airbytehq/airbyte/pull/79624) | Add new synchronous cursor-paginated `discount_codes_sync` stream with support for >100 redeem codes per discount | | 3.4.1 | 2026-06-10 | [78513](https://github.com/airbytehq/airbyte/pull/78513) | Enable `groupObjects: true` on the `discount_codes` bulk query to preserve grouped output for stores with many redeem codes per discount. |