Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ data:
connectorSubtype: file
connectorType: source
definitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerImageTag: 4.15.4
dockerImageTag: 4.15.5
dockerRepository: airbyte/source-s3
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
externalDocumentationUrls:
Expand Down
35 changes: 14 additions & 21 deletions airbyte-integrations/connectors/source-s3/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-s3/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "4.15.4"
version = "4.15.5"
name = "source-s3"
description = "Source implementation for S3."
authors = [ "Airbyte <contact@airbyte.io>",]
Expand All @@ -22,7 +22,7 @@ wcmatch = "^10.0"
dill = "^0.3.4"
transformers = "^4.38.2"
urllib3 = "<2"
airbyte-cdk = {extras = ["file-based"], version = "^7.0.0"}
airbyte-cdk = {version = "7.19.2.post4.dev26418171295", extras = ["file-based"], allow-prereleases = true}
pendulum = "^3.0.0"
psutil = "^7.0.0"
numpy = ">=2.2.6,<3"
Expand Down
28 changes: 27 additions & 1 deletion airbyte-integrations/connectors/source-s3/source_s3/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,34 @@
#
from __future__ import annotations


def _cap_malloc_arenas() -> None:
"""
Cap glibc malloc arenas to keep peak RSS low under the concurrent file-based
read path. With the default `8 x N_CPUs` arenas, the source pod can pin 1+ GB
of allocator overhead that is never returned to the OS, which combined with
the in-flight working set pushes the pod over its 2 Gi memory limit on
streams with many or large files (oncall #12663).

glibc-only. No-op on macOS or musl-libc images; falls back to default
behavior, which is correct for those platforms.
"""
try:
import ctypes

libc = ctypes.CDLL("libc.so.6", use_errno=True)
# M_ARENA_MAX = -8 from glibc's malloc.h. Equivalent to setting the
# MALLOC_ARENA_MAX environment variable, but applied at process start
# so it is scoped to this connector only.
libc.mallopt(-8, 2)
except (OSError, AttributeError):
pass


_cap_malloc_arenas()

from source_s3.v4 import SourceS3


def run() -> None:
SourceS3.launch()
SourceS3.launch()
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
#

from .config import Config
from .cursor import Cursor
from .legacy_config_transformer import LegacyConfigTransformer
from .source import SourceS3
from .stream_reader import SourceS3StreamReader

__all__ = ["Config", "Cursor", "LegacyConfigTransformer", "SourceS3", "SourceS3StreamReader"]
__all__ = ["Config", "LegacyConfigTransformer", "SourceS3", "SourceS3StreamReader"]
14 changes: 14 additions & 0 deletions airbyte-integrations/connectors/source-s3/source_s3/v4/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@ def documentation_url(cls) -> AnyUrl:
default="use_records_transfer",
)

concurrency_level: Optional[int] = Field(
title="Concurrency Level",
description=(
"Maximum number of partition reader threads source-s3 uses to read files in parallel. "
"Higher values increase throughput but also increase peak memory of the source pod. "
"Leave empty to use the CDK default (100). Lower this (e.g. 20-50) if the source pod hits "
"its memory limit on streams with many or large files."
),
default=None,
gt=0,
order=7,
group="advanced",
)

@root_validator
def validate_optional_args(cls, values):
aws_access_key_id = values.get("aws_access_key_id")
Expand Down
163 changes: 0 additions & 163 deletions airbyte-integrations/connectors/source-s3/source_s3/v4/cursor.py

This file was deleted.

Loading
Loading