Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerImageTag: 3.5.1
dockerImageTag: 3.5.2
dockerRepository: airbyte/source-shopify
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
erdUrl: https://dbdocs.io/airbyteio/source-shopify?view=relationships
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "3.5.1"
version = "3.5.2"
name = "source-shopify"
description = "Source CDK implementation for Shopify."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ class BaseBulkException(AirbyteTracedException):

failure_type: FailureType = FailureType.config_error

def __init__(self, message: str, **kwargs) -> None:
super().__init__(internal_message=message, failure_type=self.failure_type, **kwargs)
def __init__(self, internal_message: str, message: str | None = None, **kwargs) -> None:
super().__init__(message=message, internal_message=internal_message, failure_type=self.failure_type, **kwargs)

class BulkJobError(BaseBulkException):
"""Raised when there are BULK Job Errors in response"""
Expand Down Expand Up @@ -66,3 +66,8 @@ class BulkJobConcurrentError(BaseBulkException):
"""Raised when failing the job after hitting too many BulkJobCreationFailedConcurrentError."""

failure_type: FailureType = FailureType.transient_error

class BulkJobAuthFailedError(BaseBulkException):
"""Raised when bulk job creation fails due to an invalid or expired access token."""

failure_type: FailureType = FailureType.config_error
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from datetime import datetime
from enum import Enum
from time import sleep, time
from typing import Any, Final, Iterable, List, Mapping, Optional
from typing import Any, Final, Iterable, List, Mapping, Optional, Tuple

import pendulum as pdm
import requests
Expand Down Expand Up @@ -46,6 +46,9 @@ class ShopifyBulkManager:
parent_stream_name: Optional[str] = None
parent_stream_cursor: Optional[str] = None

# keywords (lowercased) that indicate an authentication/credential error from Shopify
_AUTH_ERROR_KEYWORDS: Final[Tuple[str, ...]] = ("invalid api key or access token",)

# 10Mb chunk size to save the file
_retrieve_chunk_size: Final[int] = 1024 * 1024 * 10
_job_max_retries: Final[int] = 6
Expand Down Expand Up @@ -386,7 +389,24 @@ def _on_access_denied_job(self, **kwagrs) -> AirbyteTracedException:
def _on_job_with_errors(self, errors: List[Mapping[str, Any]]) -> AirbyteTracedException:
raise ShopifyBulkExceptions.BulkJobError(f"Could not validate the status of the BULK Job `{self._job_id}`. Errors: {errors}.")

def _is_auth_error(self, errors: List[Mapping[str, Any]]) -> bool:
for error in errors:
if isinstance(error, str):
error_text = error
elif isinstance(error, dict):
error_text = error.get("message", "")
else:
continue
if any(keyword in error_text.lower() for keyword in self._AUTH_ERROR_KEYWORDS):
return True
return False

def _on_non_handable_job_error(self, errors: List[Mapping[str, Any]]) -> AirbyteTracedException:
if self._is_auth_error(errors):
raise ShopifyBulkExceptions.BulkJobAuthFailedError(
internal_message=f"The Stream: `{self.http_client.name}`, auth error: {errors}",
message="Shopify access token is invalid or has expired.",
)
raise ShopifyBulkExceptions.BulkJobNonHandableError(f"The Stream: `{self.http_client.name}`, Non-handable error occured: {errors}")

def _get_server_errors(self, response: requests.Response) -> List[Optional[Mapping[str, Any]]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,56 @@ def bulk_successful_response_with_errors():
}


@pytest.fixture
def bulk_response_with_auth_error():
return {
"data": {
"bulkOperationRunQuery": {
"userErrors": [],
},
},
"errors": "[API] Invalid API key or access token (unrecognized login or wrong password)",
"extensions": {
"cost": {
"requestedQueryCost": 10,
"actualQueryCost": 10,
"throttleStatus": {
"maximumAvailable": 1000.0,
"currentlyAvailable": 990,
"restoreRate": 50.0,
},
}
},
}


@pytest.fixture
def bulk_response_with_auth_error_in_user_errors():
return {
"data": {
"bulkOperationRunQuery": {
"userErrors": [
{
"message": "Invalid API key or access token",
"code": "INVALID",
},
],
},
},
"extensions": {
"cost": {
"requestedQueryCost": 10,
"actualQueryCost": 10,
"throttleStatus": {
"maximumAvailable": 1000.0,
"currentlyAvailable": 990,
"restoreRate": 50.0,
},
}
},
}


@pytest.fixture
def bulk_successful_response_with_no_id():
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
TransactionsGraphql,
)

from airbyte_cdk.models import SyncMode
from airbyte_cdk.models import FailureType, SyncMode


_ANY_SLICE = {}
Expand Down Expand Up @@ -289,6 +289,48 @@ def test_retry_on_job_creation_exception(
assert expected_msg in repr(error.value) and requests_mock.call_count == call_count_expected


@pytest.mark.parametrize(
"job_response, expected_error_type, expected_failure_type, expected_msg",
[
pytest.param(
"bulk_response_with_auth_error",
ShopifyBulkExceptions.BulkJobAuthFailedError,
FailureType.config_error,
"Shopify access token is invalid or has expired.",
id="auth_error_from_server_errors_string",
),
pytest.param(
"bulk_response_with_auth_error_in_user_errors",
ShopifyBulkExceptions.BulkJobAuthFailedError,
FailureType.config_error,
"Shopify access token is invalid or has expired.",
id="auth_error_from_user_errors_dict",
),
pytest.param(
"bulk_successful_response_with_errors",
ShopifyBulkExceptions.BulkJobNonHandableError,
FailureType.system_error,
"Non-handable error occured",
id="non_auth_error_stays_system_error",
),
],
)
def test_auth_error_classified_as_config_error(
request, requests_mock, auth_config, job_response, expected_error_type, expected_failure_type, expected_msg
) -> None:
stream = MetafieldOrders(auth_config)
stream.job_manager._job_backoff_time = 0
stream.job_manager._job_max_retries = 0

requests_mock.post(stream.job_manager.base_url, json=request.getfixturevalue(job_response))

with pytest.raises(expected_error_type) as error:
stream.job_manager.create_job(_ANY_SLICE, _ANY_FILTER_FIELD)

assert expected_msg in str(error.value)
assert error.value.failure_type == expected_failure_type


@pytest.mark.parametrize(
"job_response, expected",
[
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/shopify.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ If you synced one of these streams on an earlier connector version and suspect m

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.5.2 | 2026-07-01 | [81363](https://github.com/airbytehq/airbyte/pull/81363) | Classify Shopify authentication errors as config errors instead of system errors during bulk job creation |
| 3.5.1 | 2026-06-24 | [80787](https://github.com/airbytehq/airbyte/pull/80787) | Validate and normalize the `shop` config value: accept a bare subdomain or full myshopify URL, and reject malformed input with a clear config error. The normalized handle is also written to the `shop_url` record field, so previously-malformed configs now emit a clean value. |
| 3.5.0 | 2026-06-11 | [79624](https://github.com/airbytehq/airbyte/pull/79624) | Add new synchronous cursor-paginated `discount_codes_sync` stream with support for >100 redeem codes per discount |
| 3.4.1 | 2026-06-10 | [78513](https://github.com/airbytehq/airbyte/pull/78513) | Enable `groupObjects: true` on the `discount_codes` bulk query to preserve grouped output for stores with many redeem codes per discount. |
Expand Down
Loading