Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ Newer updates can be found here: [GitHub Release Notes](https://github.com/airby

# Changelog

## 6.5.3

Bugfix: File transfer syncs now fail early if the shared staging directory is unavailable.

## 6.5.2

bugfix: Ensure that streams with partition router are not executed concurrently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def upload(self, record: Record) -> None:
),
)

files_directory = Path(get_files_directory())
files_directory = Path(get_files_directory(is_file_transfer=True))

file_name = (
self.filename_extractor.eval(self.config, record=record)
Expand Down
2 changes: 1 addition & 1 deletion airbyte_cdk/sources/file_based/file_types/file_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

class FileTransfer:
def __init__(self) -> None:
self._local_directory = get_files_directory()
self._local_directory = get_files_directory(is_file_transfer=True)

def upload(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
airbyte_columns = [ab_last_mod_col, ab_file_name_col]
use_file_transfer = False
preserve_directory_structure = True
_file_transfer = FileTransfer()
_file_transfer: Optional[FileTransfer] = None

def __init__(self, **kwargs: Any):
if self.FILE_TRANSFER_KW in kwargs:
Expand Down Expand Up @@ -164,7 +164,10 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte

try:
if self.use_file_transfer:
for file_record_data, file_reference in self._file_transfer.upload(
if self._file_transfer is None:
self._file_transfer = FileTransfer()
file_transfer = self._file_transfer
for file_record_data, file_reference in file_transfer.upload(
file=file, stream_reader=self.stream_reader, logger=self.logger
):
yield stream_data_to_airbyte_message(
Expand Down
50 changes: 42 additions & 8 deletions airbyte_cdk/sources/utils/files_directory.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,49 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#
import os
from os import getenv
from pathlib import Path

AIRBYTE_STAGING_DIRECTORY = os.getenv("AIRBYTE_STAGING_DIRECTORY", "/staging/files")
from airbyte_cdk.models import FailureType
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

AIRBYTE_STAGING_DIRECTORY_ENV_VAR = "AIRBYTE_STAGING_DIRECTORY"
DEFAULT_AIRBYTE_STAGING_DIRECTORY = "/staging/files"
DEFAULT_LOCAL_DIRECTORY = "/tmp/airbyte-file-transfer"
UNAVAILABLE_STAGING_DIRECTORY_MESSAGE = "File transfer staging directory is unavailable."


def _validate_staging_directory(staging_directory: str) -> None:
staging_directory_path = Path(staging_directory)
if not staging_directory_path.exists():
raise AirbyteTracedException(
message=UNAVAILABLE_STAGING_DIRECTORY_MESSAGE,
internal_message=(
f"Configured {AIRBYTE_STAGING_DIRECTORY_ENV_VAR} does not exist: "
f"{staging_directory}"
),
failure_type=FailureType.system_error,
)

if not staging_directory_path.is_dir():
raise AirbyteTracedException(
message=UNAVAILABLE_STAGING_DIRECTORY_MESSAGE,
internal_message=(
f"Configured {AIRBYTE_STAGING_DIRECTORY_ENV_VAR} is not a directory: "
f"{staging_directory}"
),
failure_type=FailureType.system_error,
)


def get_files_directory(is_file_transfer: bool = False) -> str:
configured_staging_directory = getenv(AIRBYTE_STAGING_DIRECTORY_ENV_VAR)
if configured_staging_directory:
_validate_staging_directory(configured_staging_directory)
return configured_staging_directory

if not is_file_transfer:
return DEFAULT_LOCAL_DIRECTORY

def get_files_directory() -> str:
return (
AIRBYTE_STAGING_DIRECTORY
if os.path.exists(AIRBYTE_STAGING_DIRECTORY)
else DEFAULT_LOCAL_DIRECTORY
)
_validate_staging_directory(DEFAULT_AIRBYTE_STAGING_DIRECTORY)
return DEFAULT_AIRBYTE_STAGING_DIRECTORY
45 changes: 28 additions & 17 deletions unit_tests/sources/declarative/file/test_file_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def test_get_articles(self) -> None:

def test_get_article_attachments(self) -> None:
with HttpMocker() as http_mocker:
files_directory = Path(__file__).parent / "staging"
files_directory.mkdir(exist_ok=True)
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
Expand All @@ -145,12 +147,13 @@ def test_get_article_attachments(self) -> None:
),
)

output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
)
with patch.dict("os.environ", {"AIRBYTE_STAGING_DIRECTORY": str(files_directory)}):
output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
)

assert output.records
file_reference = output.records[0].record.file_reference
Expand All @@ -167,6 +170,8 @@ def test_get_article_attachments(self) -> None:

def test_get_article_attachments_with_filename_extractor(self) -> None:
with HttpMocker() as http_mocker:
files_directory = Path(__file__).parent / "staging"
files_directory.mkdir(exist_ok=True)
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
Expand All @@ -184,20 +189,21 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:
),
)

output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
)
with patch.dict("os.environ", {"AIRBYTE_STAGING_DIRECTORY": str(files_directory)}):
output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
)

assert len(output.records) == 1
file_reference = output.records[0].record.file_reference
assert file_reference
assert (
file_reference.staging_file_url
== "/tmp/airbyte-file-transfer/article_attachments/12138758717583/some_image_name.png"
== f"{files_directory}/article_attachments/12138758717583/some_image_name.png"
)
assert file_reference.source_file_relative_path
assert not re.match(
Expand All @@ -207,6 +213,8 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:

def test_get_article_attachments_messages_for_connector_builder(self) -> None:
with HttpMocker() as http_mocker:
files_directory = Path(__file__).parent / "staging"
files_directory.mkdir(exist_ok=True)
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
Expand All @@ -231,9 +239,12 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# Patch the factory class where ConcurrentDeclarativeSource (parent of YamlDeclarativeSource) imports it
with patch(
"airbyte_cdk.sources.declarative.concurrent_declarative_source.ModelToComponentFactory",
new=MockModelToComponentFactory,
with (
patch(
"airbyte_cdk.sources.declarative.concurrent_declarative_source.ModelToComponentFactory",
new=MockModelToComponentFactory,
),
patch.dict("os.environ", {"AIRBYTE_STAGING_DIRECTORY": str(files_directory)}),
):
output = read(
self._config(),
Expand Down
22 changes: 22 additions & 0 deletions unit_tests/sources/file_based/file_types/test_file_transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
#

import pytest

from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.file_based.file_types.file_transfer import FileTransfer
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


def test_file_transfer_raises_when_shared_staging_directory_is_missing(monkeypatch) -> None:
monkeypatch.delenv("AIRBYTE_STAGING_DIRECTORY", raising=False)

with pytest.raises(AirbyteTracedException) as exc_info:
FileTransfer()

assert exc_info.value.failure_type == FailureType.system_error
assert exc_info.value.message == "File transfer staging directory is unavailable."
assert exc_info.value.internal_message == (
"Configured AIRBYTE_STAGING_DIRECTORY does not exist: /staging/files"
)
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,17 @@ def setUp(self) -> None:

def test_when_read_records_from_slice_then_return_records(self) -> None:
"""Verify that we have the new file method and data is empty"""
with mock.patch.object(
FileTransfer,
"upload",
return_value=[(self._A_FILE_RECORD_DATA, self._A_FILE_REFERENCE_MESSAGE)],
with (
mock.patch.object(
FileTransfer,
"__init__",
return_value=None,
),
mock.patch.object(
FileTransfer,
"upload",
return_value=[(self._A_FILE_RECORD_DATA, self._A_FILE_REFERENCE_MESSAGE)],
),
):
remote_file = RemoteFile(uri="uri", last_modified=self._NOW)
messages = list(self._stream.read_records_from_slice({"files": [remote_file]}))
Expand Down
94 changes: 94 additions & 0 deletions unit_tests/sources/utils/test_files_directory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#
# Copyright (c) 2026 Airbyte, Inc., all rights reserved.
#

import pytest

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the unused import and pushed the fix.


Devin session


from airbyte_cdk.models import FailureType
from airbyte_cdk.sources.utils.files_directory import get_files_directory
from airbyte_cdk.utils.traced_exception import AirbyteTracedException


@pytest.mark.parametrize(
"is_file_transfer",
[
pytest.param(False, id="local_usage"),
pytest.param(True, id="file_transfer"),
],
)
def test_get_files_directory_uses_configured_staging_directory(
monkeypatch, tmp_path, is_file_transfer
) -> None:
monkeypatch.setenv("AIRBYTE_STAGING_DIRECTORY", str(tmp_path))

assert get_files_directory(is_file_transfer=is_file_transfer) == str(tmp_path)


@pytest.mark.parametrize(
"is_file_transfer",
[
pytest.param(False, id="local_usage"),
pytest.param(True, id="file_transfer"),
],
)
def test_get_files_directory_raises_when_explicit_staging_directory_is_missing(
monkeypatch, tmp_path, is_file_transfer
) -> None:
missing_staging_directory = tmp_path / "missing"
monkeypatch.setenv("AIRBYTE_STAGING_DIRECTORY", str(missing_staging_directory))

with pytest.raises(AirbyteTracedException) as exc_info:
get_files_directory(is_file_transfer=is_file_transfer)

assert exc_info.value.failure_type == FailureType.system_error
assert exc_info.value.message == "File transfer staging directory is unavailable."
assert exc_info.value.internal_message == (
f"Configured AIRBYTE_STAGING_DIRECTORY does not exist: {missing_staging_directory}"
)


def test_get_files_directory_falls_back_for_local_usage_without_configured_staging_directory(
monkeypatch,
) -> None:
monkeypatch.delenv("AIRBYTE_STAGING_DIRECTORY", raising=False)

assert get_files_directory() == "/tmp/airbyte-file-transfer"


def test_get_files_directory_raises_for_file_transfer_without_shared_staging_directory(
monkeypatch,
) -> None:
monkeypatch.delenv("AIRBYTE_STAGING_DIRECTORY", raising=False)

with pytest.raises(AirbyteTracedException) as exc_info:
get_files_directory(is_file_transfer=True)

assert exc_info.value.failure_type == FailureType.system_error
assert exc_info.value.message == "File transfer staging directory is unavailable."
assert exc_info.value.internal_message == (
"Configured AIRBYTE_STAGING_DIRECTORY does not exist: /staging/files"
)


@pytest.mark.parametrize(
"is_file_transfer",
[
pytest.param(False, id="local_usage"),
pytest.param(True, id="file_transfer"),
],
)
def test_get_files_directory_raises_when_explicit_staging_directory_is_a_file(
monkeypatch, tmp_path, is_file_transfer
) -> None:
staging_directory = tmp_path / "stage"
staging_directory.write_text("not a directory")
monkeypatch.setenv("AIRBYTE_STAGING_DIRECTORY", str(staging_directory))

with pytest.raises(AirbyteTracedException) as exc_info:
get_files_directory(is_file_transfer=is_file_transfer)

assert exc_info.value.failure_type == FailureType.system_error
assert exc_info.value.message == "File transfer staging directory is unavailable."
assert exc_info.value.internal_message == (
f"Configured AIRBYTE_STAGING_DIRECTORY is not a directory: {staging_directory}"
)
Loading