Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2555,6 +2555,10 @@ definitions:
type: array
items:
"$ref": "#/definitions/TypesMap"
default_type:
title: Default Type
description: The default Airbyte type to use when no type mapping matches the source field type.
type: string
$parameters:
type: object
additionalProperties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,11 @@ class SchemaTypeIdentifier(BaseModel):
title="Type Path",
)
types_mapping: Optional[List[TypesMap]] = None
default_type: Optional[str] = Field(
None,
description="The default Airbyte type to use when no type mapping matches the source field type.",
title="Default Type",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2600,6 +2600,7 @@ def create_schema_type_identifier(
key_pointer=model_key_pointer,
type_pointer=model_type_pointer,
types_mapping=types_mapping,
default_type=model.default_type,
parameters=model.parameters or {},
)

Expand Down
10 changes: 8 additions & 2 deletions airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class SchemaTypeIdentifier:
type_pointer: Optional[List[Union[InterpolatedString, str]]] = None
types_mapping: Optional[List[TypesMap]] = None
schema_pointer: Optional[List[Union[InterpolatedString, str]]] = None
default_type: Optional[str] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.schema_pointer = (
Expand Down Expand Up @@ -269,12 +270,17 @@ def _replace_type_if_not_valid(
return types_map.target_type
return field_type

@staticmethod
def _get_airbyte_type(field_type: str) -> MutableMapping[str, Any]:
def _get_airbyte_type(self, field_type: str) -> MutableMapping[str, Any]:
"""
Maps a field type to its corresponding Airbyte type definition.
Falls back to `default_type` when configured and `field_type` is not recognized.
"""
if field_type not in AIRBYTE_DATA_TYPES:
default_type = self.schema_type_identifier.default_type
if default_type is not None:
if default_type not in AIRBYTE_DATA_TYPES:
raise ValueError(f"Invalid default Airbyte data type: {default_type}")
return deepcopy(AIRBYTE_DATA_TYPES[default_type])
raise ValueError(f"Invalid Airbyte data type: {field_type}")
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

return deepcopy(AIRBYTE_DATA_TYPES[field_type])
Expand Down
240 changes: 240 additions & 0 deletions unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,126 @@ def test_dynamic_schema_loader_manifest_flow():
assert actual_catalog.streams[0].json_schema == expected_schema


@pytest.mark.parametrize(
"retriever_data, default_type, expected_schema",
[
pytest.param(
iter(
[
{
"schema": [
{"key": "name", "type": "string"},
{"key": "custom_field", "type": "unknown_source_type"},
]
}
]
),
"string",
{
"$schema": "https://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": {
"name": {"type": ["null", "string"]},
"custom_field": {"type": ["null", "string"]},
},
},
id="unmapped_type_falls_back_to_default",
),
pytest.param(
iter(
[
{
"schema": [
{"key": "count", "type": "integer"},
{"key": "blob", "type": "exotic_plugin_type"},
]
}
]
),
"string",
{
"$schema": "https://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": {
"count": {"type": ["null", "integer"]},
"blob": {"type": ["null", "string"]},
},
},
id="valid_type_unchanged_unknown_type_uses_default",
),
],
)
def test_dynamic_schema_loader_default_type(retriever_data, default_type, expected_schema):
schema_type_identifier = SchemaTypeIdentifier(
schema_pointer=["schema"],
key_pointer=["key"],
type_pointer=["type"],
types_mapping=[],
default_type=default_type,
parameters={},
)
loader = DynamicSchemaLoader(
retriever=MagicMock(),
config=MagicMock(),
parameters={},
schema_type_identifier=schema_type_identifier,
)
loader.retriever.read_records = MagicMock(return_value=retriever_data)

schema = loader.get_json_schema()
assert schema == expected_schema


def test_dynamic_schema_loader_no_default_type_raises_on_unknown():
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
schema_type_identifier = SchemaTypeIdentifier(
schema_pointer=["schema"],
key_pointer=["key"],
type_pointer=["type"],
types_mapping=[],
parameters={},
)
loader = DynamicSchemaLoader(
retriever=MagicMock(),
config=MagicMock(),
parameters={},
schema_type_identifier=schema_type_identifier,
)
loader.retriever.read_records = MagicMock(
return_value=iter([{"schema": [{"key": "field", "type": "totally_unknown"}]}])
)

with pytest.raises(ValueError, match="Invalid Airbyte data type: totally_unknown"):
loader.get_json_schema()


def test_dynamic_schema_loader_invalid_default_type_raises():
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
schema_type_identifier = SchemaTypeIdentifier(
schema_pointer=["schema"],
key_pointer=["key"],
type_pointer=["type"],
types_mapping=[],
default_type="not_a_real_airbyte_type",
parameters={},
)
loader = DynamicSchemaLoader(
retriever=MagicMock(),
config=MagicMock(),
parameters={},
schema_type_identifier=schema_type_identifier,
)
loader.retriever.read_records = MagicMock(
return_value=iter([{"schema": [{"key": "field", "type": "unknown"}]}])
)

with pytest.raises(
ValueError,
match="Invalid default Airbyte data type: not_a_real_airbyte_type",
):
loader.get_json_schema()


def test_dynamic_schema_loader_with_type_conditions():
_MANIFEST_WITH_TYPE_CONDITIONS = deepcopy(_MANIFEST)
_MANIFEST_WITH_TYPE_CONDITIONS["definitions"]["party_members_stream"]["schema_loader"][
Expand Down Expand Up @@ -414,3 +534,123 @@ def test_dynamic_schema_loader_with_type_conditions():

assert len(actual_catalog.streams) == 1
assert actual_catalog.streams[0].json_schema == expected_schema


@pytest.mark.parametrize(
"retriever_data, default_type, expected_schema",
[
pytest.param(
iter(
[
{
"schema": [
{"key": "name", "type": "string"},
{"key": "custom_field", "type": "unknown_source_type"},
]
}
]
),
"string",
{
"$schema": "https://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": {
"name": {"type": ["null", "string"]},
"custom_field": {"type": ["null", "string"]},
},
},
id="unmapped_type_falls_back_to_default",
),
pytest.param(
iter(
[
{
"schema": [
{"key": "count", "type": "integer"},
{"key": "blob", "type": "exotic_plugin_type"},
]
}
]
),
"string",
{
"$schema": "https://json-schema.org/draft-07/schema#",
"additionalProperties": True,
"type": "object",
"properties": {
"count": {"type": ["null", "integer"]},
"blob": {"type": ["null", "string"]},
},
},
id="valid_type_unchanged_unknown_type_uses_default",
),
],
)
def test_dynamic_schema_loader_default_type(retriever_data, default_type, expected_schema):
schema_type_identifier = SchemaTypeIdentifier(
schema_pointer=["schema"],
key_pointer=["key"],
type_pointer=["type"],
types_mapping=[],
default_type=default_type,
parameters={},
)
loader = DynamicSchemaLoader(
retriever=MagicMock(),
config=MagicMock(),
parameters={},
schema_type_identifier=schema_type_identifier,
)
loader.retriever.read_records = MagicMock(return_value=retriever_data)

schema = loader.get_json_schema()
assert schema == expected_schema


def test_dynamic_schema_loader_no_default_type_raises_on_unknown():
schema_type_identifier = SchemaTypeIdentifier(
schema_pointer=["schema"],
key_pointer=["key"],
type_pointer=["type"],
types_mapping=[],
parameters={},
)
loader = DynamicSchemaLoader(
retriever=MagicMock(),
config=MagicMock(),
parameters={},
schema_type_identifier=schema_type_identifier,
)
loader.retriever.read_records = MagicMock(
return_value=iter([{"schema": [{"key": "field", "type": "totally_unknown"}]}])
)

with pytest.raises(ValueError, match="Invalid Airbyte data type: totally_unknown"):
loader.get_json_schema()


def test_dynamic_schema_loader_invalid_default_type_raises():
schema_type_identifier = SchemaTypeIdentifier(
schema_pointer=["schema"],
key_pointer=["key"],
type_pointer=["type"],
types_mapping=[],
default_type="not_a_real_airbyte_type",
parameters={},
)
loader = DynamicSchemaLoader(
retriever=MagicMock(),
config=MagicMock(),
parameters={},
schema_type_identifier=schema_type_identifier,
)
loader.retriever.read_records = MagicMock(
return_value=iter([{"schema": [{"key": "field", "type": "unknown"}]}])
)

with pytest.raises(
ValueError,
match="Invalid default Airbyte data type: not_a_real_airbyte_type",
):
loader.get_json_schema()
Loading