diff --git a/airbyte_cdk/sources/file_based/file_based_source.py b/airbyte_cdk/sources/file_based/file_based_source.py index df469e834..62ad1abb5 100644 --- a/airbyte_cdk/sources/file_based/file_based_source.py +++ b/airbyte_cdk/sources/file_based/file_based_source.py @@ -124,9 +124,16 @@ def __init__( self.logger = init_logger(f"airbyte.{self.name}") self.errors_collector: FileBasedErrorsCollector = FileBasedErrorsCollector() self._message_repository: Optional[MessageRepository] = None + configured_concurrency: int | None = self._concurrency_level + concurrency = ( + min(configured_concurrency, MAX_CONCURRENCY) + if configured_concurrency is not None + else MAX_CONCURRENCY + ) + initial_n_partitions = max(concurrency // 2, 1) concurrent_source = ConcurrentSource.create( - MAX_CONCURRENCY, - INITIAL_N_PARTITIONS, + concurrency, + initial_n_partitions, self.logger, self._slice_logger, self.message_repository, diff --git a/unit_tests/sources/file_based/test_file_based_source_concurrency.py b/unit_tests/sources/file_based/test_file_based_source_concurrency.py new file mode 100644 index 000000000..bb6d4a7fe --- /dev/null +++ b/unit_tests/sources/file_based/test_file_based_source_concurrency.py @@ -0,0 +1,66 @@ +# +# Copyright (c) 2026 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock, patch + +import pytest + +from airbyte_cdk.sources.file_based.file_based_source import ( + MAX_CONCURRENCY, + FileBasedSource, +) + + +class _ConcreteFBSource(FileBasedSource): + """Minimal concrete subclass so we can instantiate FileBasedSource.""" + + _concurrency_level = None + + @property + def name(self) -> str: + return "test-source" + + def check_connection(self, logger, config): + return True, None + + def streams(self, config): + return [] + + +@pytest.mark.parametrize( + "concurrency_level, expected_num_workers, expected_initial_partitions", + [ + pytest.param(None, MAX_CONCURRENCY, MAX_CONCURRENCY // 2, id="none_uses_max"), + pytest.param(100, 100, 50, id="default_concurrency"), + pytest.param(20, 20, 10, id="reduced_concurrency"), + pytest.param(2, 2, 1, id="minimal_concurrency"), + pytest.param(200, MAX_CONCURRENCY, MAX_CONCURRENCY // 2, id="capped_at_max"), + ], +) +def test_concurrency_level_controls_thread_pool_size( + concurrency_level, expected_num_workers, expected_initial_partitions +): + _ConcreteFBSource._concurrency_level = concurrency_level + + with patch( + "airbyte_cdk.sources.file_based.file_based_source.ConcurrentSource.create" + ) as mock_create: + mock_create.return_value = MagicMock() + try: + _ConcreteFBSource( + stream_reader=MagicMock(), + spec_class=MagicMock(), + catalog=None, + config=None, + state=None, + ) + except Exception: + pass # Other init errors are fine; we only care about the ConcurrentSource.create call + + mock_create.assert_called_once() + call_args = mock_create.call_args + actual_num_workers = call_args[0][0] + actual_initial_partitions = call_args[0][1] + assert actual_num_workers == expected_num_workers + assert actual_initial_partitions == expected_initial_partitions