From ba7429e720ebc584676c0608d64b811f0dadad03 Mon Sep 17 00:00:00 2001 From: Quentin Lhoest Date: Wed, 1 Jul 2026 17:13:36 +0200 Subject: [PATCH 1/2] hf sources and destinations --- .../README.md | 37 ++ .../__init__.py | 3 + .../destination.py | 230 ++++++++++ .../destination_hugging_face_buckets/run.py | 17 + .../spec.json | 38 ++ .../destination-hugging-face-buckets/icon.svg | 1 + .../destination-hugging-face-buckets/main.py | 10 + .../metadata.yaml | 48 +++ .../pyproject.toml | 33 ++ .../unit_tests/test_destination.py | 95 ++++ .../README.md | 47 ++ .../__init__.py | 5 + .../destination.py | 149 +++++++ .../destination_hugging_face_datasets/run.py | 9 + .../spec.json | 28 ++ .../icon.svg | 1 + .../destination-hugging-face-datasets/main.py | 9 + .../metadata.yaml | 36 ++ .../pyproject.toml | 31 ++ .../unit_tests/test_destination.py | 111 +++++ .../source-hugging-face-buckets/README.md | 37 ++ .../source-hugging-face-buckets/icon.svg | 1 + .../source-hugging-face-buckets/main.py | 9 + .../source-hugging-face-buckets/metadata.yaml | 48 +++ .../pyproject.toml | 31 ++ .../source_hugging_face_buckets/__init__.py | 3 + .../source_hugging_face_buckets/run.py | 13 + .../source_hugging_face_buckets/source.py | 405 ++++++++++++++++++ .../source_hugging_face_buckets/spec.json | 46 ++ .../unit_tests/test_source.py | 86 ++++ .../source-hugging-face-datasets/icon.svg | 38 +- .../source-hugging-face-datasets/main.py | 8 + .../metadata.yaml | 16 +- .../pyproject.toml | 16 + .../source_hugging_face_datasets/__init__.py | 3 + .../source_hugging_face_datasets/run.py | 18 + .../source_hugging_face_datasets/source.py | 316 ++++++++++++++ .../source-hugging-face-datasets/spec.json | 53 +++ .../unit_tests/test_source.py | 121 ++++++ 39 files changed, 2161 insertions(+), 45 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-hugging-face-buckets/README.md create mode 100644 airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/__init__.py create mode 100644 airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/destination.py create mode 100644 airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/run.py create mode 100644 airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/spec.json create mode 100644 airbyte-integrations/connectors/destination-hugging-face-buckets/icon.svg create mode 100644 airbyte-integrations/connectors/destination-hugging-face-buckets/main.py create mode 100644 airbyte-integrations/connectors/destination-hugging-face-buckets/metadata.yaml create mode 100644 airbyte-integrations/connectors/destination-hugging-face-buckets/pyproject.toml create mode 100644 airbyte-integrations/connectors/destination-hugging-face-buckets/unit_tests/test_destination.py create mode 100644 airbyte-integrations/connectors/destination-hugging-face-datasets/README.md create mode 100644 airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/__init__.py create mode 100644 airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/destination.py create mode 100644 airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/run.py create mode 100644 airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/spec.json create mode 100644 airbyte-integrations/connectors/destination-hugging-face-datasets/icon.svg create mode 100644 airbyte-integrations/connectors/destination-hugging-face-datasets/main.py create mode 100644 airbyte-integrations/connectors/destination-hugging-face-datasets/metadata.yaml create mode 100644 airbyte-integrations/connectors/destination-hugging-face-datasets/pyproject.toml create mode 100644 airbyte-integrations/connectors/destination-hugging-face-datasets/unit_tests/test_destination.py create mode 100644 airbyte-integrations/connectors/source-hugging-face-buckets/README.md create mode 100644 airbyte-integrations/connectors/source-hugging-face-buckets/icon.svg create mode 100644 airbyte-integrations/connectors/source-hugging-face-buckets/main.py create mode 100644 airbyte-integrations/connectors/source-hugging-face-buckets/metadata.yaml create mode 100644 airbyte-integrations/connectors/source-hugging-face-buckets/pyproject.toml create mode 100644 airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/__init__.py create mode 100644 airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/run.py create mode 100644 airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/source.py create mode 100644 airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/spec.json create mode 100644 airbyte-integrations/connectors/source-hugging-face-buckets/unit_tests/test_source.py create mode 100644 airbyte-integrations/connectors/source-hugging-face-datasets/main.py create mode 100644 airbyte-integrations/connectors/source-hugging-face-datasets/pyproject.toml create mode 100644 airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/__init__.py create mode 100644 airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/run.py create mode 100644 airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/source.py create mode 100644 airbyte-integrations/connectors/source-hugging-face-datasets/spec.json create mode 100644 airbyte-integrations/connectors/source-hugging-face-datasets/unit_tests/test_source.py diff --git a/airbyte-integrations/connectors/destination-hugging-face-buckets/README.md b/airbyte-integrations/connectors/destination-hugging-face-buckets/README.md new file mode 100644 index 000000000000..7433524bb58d --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-buckets/README.md @@ -0,0 +1,37 @@ +# Destination Hugging Face Buckets + +This is an Airbyte destination connector for [Hugging Face Buckets](https://huggingface.co/docs/hub/en/buckets). + +## Overview + +This connector writes data from Airbyte sources to Hugging Face Buckets in Parquet or JSONL format. + +## Configuration + +| Field | Required | Description | +|-------|----------|-------------| +| `destination_path` | Yes | The path to the Hugging Face Bucket where data will be written. Format: `hf://buckets/{username}/{bucket}/{path}/` or `hf://username/bucket/path/` | +| `file_format` | No | The format to use when writing files (default: `parquet`). Options: `parquet`, `jsonl` | +| `overwrite` | No | Whether to overwrite existing files (default: `false`) | +| `token` | No | Hugging Face API token for authentication | + +### Examples + +```json +{ + "destination_path": "hf://buckets/lhoestq/b/", + "file_format": "parquet", + "overwrite": true, + "token": "hf_..." +} +``` + +## Supported Sync Modes + +- `append`: Append data to existing files +- `overwrite`: Overwrite existing files + +## Supported Formats + +- Parquet (.parquet) +- JSON Lines (.jsonl) \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/__init__.py b/airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/__init__.py new file mode 100644 index 000000000000..90c0556d88d6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/destination.py b/airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/destination.py new file mode 100644 index 000000000000..5efc2ae1fa7d --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/destination.py @@ -0,0 +1,230 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import json +import logging +import uuid +from typing import Any, Iterable, Mapping + +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq +from huggingface_hub import HfFileSystem + +from airbyte_cdk.destinations import Destination +from airbyte_cdk.models import ( + AirbyteConnectionStatus, + AirbyteMessage, + ConfiguredAirbyteCatalog, + Status, + Type, +) + + +class DestinationHuggingFaceBuckets(Destination): + """A destination that writes data to Hugging Face Buckets using the hf:// protocol. + + This destination supports writing data to HF Buckets in Parquet format. + It uses the huggingface_hub library's fsspec integration to read and write + files to HF Buckets. + + Supported URL formats: + - hf://buckets/{username}/{bucket}/{path}/{file.parquet} + """ + + def write( + self, + config: Mapping[str, Any], + configured_catalog: ConfiguredAirbyteCatalog, + input_messages: Iterable[AirbyteMessage], + ) -> Iterable[AirbyteMessage]: + """Write data to Hugging Face Buckets. + + Args: + config: Configuration dictionary for the destination. + configured_catalog: The catalog describing how to write data. + input_messages: Stream of Airbyte messages containing records. + + Returns: + Iterable of Airbyte messages, including state messages. + """ + destination_path = config.get("destination_path", "") + file_format = config.get("file_format", "parquet") + token = config.get("token", None) + + # Ensure destination_path has no hf:// scheme + if destination_path.startswith("hf://"): + destination_path = destination_path[5:] + if not destination_path.endswith("/"): + destination_path += "/" + + # Track which streams/files we're writing to + active_writers: dict = {} + + # Get the filesystem to write the files + fs = HfFileSystem(token=token) + + try: + for message in input_messages: + if message.type == Type.STATE: + # Emit state messages as-is + yield message + elif message.type == Type.RECORD: + if message.record is None: + continue + stream_name = message.record.stream or "default" + record_data = message.record.data or {} + + # Build file path for this stream + if file_format == "parquet": + file_path = f"{destination_path}{stream_name}.parquet" + else: + file_path = f"{destination_path}{stream_name}.jsonl" + + if file_path not in active_writers: + active_writers[file_path] = self._open_writer(file_path, file_format, fs) + + writer = active_writers[file_path] + writer.write(record_data) + + elif message.type == Type.TRACE: + # Forward trace messages + yield message + else: + # Ignore other message types + continue + + finally: + # Close all open writers + for file_path, writer in active_writers.items(): + try: + writer.close() + except Exception as e: + logging.error(f"{type(e).__name__} ({writer.file_path}): {e}") + + def _open_writer(self, file_path: str, file_format: str, fs: HfFileSystem): + """Open a file writer for the given path and format. + + Args: + file_path: The full path to write to. + file_format: The format ('parquet' or 'jsonl'). + fs: The filesystem to use. + + Returns: + A writer object appropriate for the format. + """ + if file_format == "parquet": + return _ParquetWriter(file_path, fs) + elif file_format == "jsonl": + return _JsonlWriter(file_path, fs) + else: + raise ValueError(f"Unsupported file format: {file_format}") + + def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + """Test the connection by checking if we can list and write to the bucket. + + Args: + logger: Logger for the connector. + config: The configuration dictionary. + + Returns: + AirbyteConnectionStatus indicating success or failure. + """ + try: + destination_path = config.get("destination_path", "") + token = config.get("token", None) + + if destination_path.startswith("hf://"): + destination_path = destination_path[5:] + if not destination_path.endswith("/"): + destination_path += "/" + + # Get the filesystem to test connectivity + fs = HfFileSystem(token=token) + + # Test write by creating a temporary file + temp_file = f"{destination_path}_airbyte_check_{uuid.uuid4().hex}.parquet" + + test_table = pa.table({"check": [True]}) + + with fs.open(temp_file, "wb") as f: + pq.write_table( + test_table, + f, + write_page_index=True, + use_content_defined_chunking=True, + ) + + # Clean up + fs.rm(temp_file) + + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + + except Exception as e: + return AirbyteConnectionStatus( + status=Status.FAILED, + message=f"Connection check failed: {e}", + ) + + +class _ParquetWriter: + """Writer for Parquet files in HF Buckets.""" + + def __init__(self, file_path: str, fs: HfFileSystem): + self.file_path = file_path + self.fs = fs + self._buffers: list = [] + + def write(self, record: dict): + """Append a record to the file. + + Args: + record: The record to append. + """ + self._buffers.append(record) + + def close(self): + """Flush and close the file.""" + if self._buffers: + # Convert all buffered records to a PyArrow table + df = pd.DataFrame(self._buffers) + table = pa.Table.from_pandas(df) + + # Write to HF bucket + with self.fs.open(self.file_path, "wb") as f: + pq.write_table( + table, + f, + write_page_index=True, + use_content_defined_chunking=True, + ) + + self._buffers = [] + + +class _JsonlWriter: + """Writer for JSONL files in HF Buckets.""" + + def __init__(self, file_path: str, fs: HfFileSystem): + self.file_path = file_path + self.fs = fs + self._buffers: list = [] + + def write(self, record: dict): + """Append a record to the file. + + Args: + record: The record to append. + """ + self._buffers.append(record) + + def close(self): + """Flush and close the file.""" + if self._buffers: + # Write all buffered records to the file + with self.fs.open(f"{self.file_path}", "wb") as f: + for record in self._buffers: + f.write(json.dumps(record).encode("utf-8") + b"\n") + + self._buffers = [] diff --git a/airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/run.py b/airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/run.py new file mode 100644 index 000000000000..ca6329f7cd7e --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/run.py @@ -0,0 +1,17 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import sys + +from destination_hugging_face_buckets.destination import DestinationHuggingFaceBuckets + + +def run(): + """Run the destination with sys.argv""" + destination = DestinationHuggingFaceBuckets() + destination.run(sys.argv[1:]) + + +if __name__ == "__main__": + run() \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/spec.json b/airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/spec.json new file mode 100644 index 000000000000..0ef873f61b7b --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-buckets/destination_hugging_face_buckets/spec.json @@ -0,0 +1,38 @@ +{ + "documentationUrl": "https://docs.airbyte.com/integrations/destinations/hugging-face-buckets", + "supported_destination_sync_modes": ["overwrite"], + "supportsIncremental": true, + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Destination Hugging Face Buckets", + "type": "object", + "required": ["destination_path"], + "additionalProperties": false, + "properties": { + "destination_path": { + "title": "Destination Path", + "type": "string", + "description": "The path to the Hugging Face Bucket where data will be written. Format: hf://buckets/{username}/{bucket}/{path}/. For example: hf://buckets/lhoestq/b/", + "examples": [ + "hf://buckets/lhoestq/b/" + ], + "order": 0 + }, + "file_format": { + "title": "File Format", + "type": "string", + "description": "The format to use when writing files to the bucket.", + "enum": ["parquet", "jsonl"], + "default": "parquet", + "order": 1 + }, + "token": { + "title": "Hugging Face Token", + "type": "string", + "description": "Your Hugging Face token for authentication. Required for private buckets or write access.", + "airbyte_secret": true, + "order": 2 + } + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-buckets/icon.svg b/airbyte-integrations/connectors/destination-hugging-face-buckets/icon.svg new file mode 100644 index 000000000000..dc1cf3ffb77f --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-buckets/icon.svg @@ -0,0 +1 @@ +HuggingFace \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-buckets/main.py b/airbyte-integrations/connectors/destination-hugging-face-buckets/main.py new file mode 100644 index 000000000000..4b0813142fa6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-buckets/main.py @@ -0,0 +1,10 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + + +from destination_hugging_face_buckets.run import run + + +if __name__ == "__main__": + run() \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-buckets/metadata.yaml b/airbyte-integrations/connectors/destination-hugging-face-buckets/metadata.yaml new file mode 100644 index 000000000000..5818ff6ba4f6 --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-buckets/metadata.yaml @@ -0,0 +1,48 @@ +{ + "documentationUrl": "https://docs.airbyte.com/integrations/destinations/hugging-face-buckets", + "connectorBuildOptions": { + "baseImage": "docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473649ecb593f56f58e5d0096a87ee0b231777e2e3e73" + }, + "connectorSubtype": "file", + "connectorType": "destination", + "definitionId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", + "dockerImageTag": "0.1.0", + "dockerRepository": "airbyte/destination-hugging-face-buckets", + "documentationUrl": "https://docs.airbyte.com/integrations/destinations/hugging-face-buckets", + "githubIssueLabel": "destination-hugging-face-buckets", + "icon": "icon.svg", + "license": "ELv2", + "name": "Hugging Face Buckets", + "registryOverrides": { + "cloud": { + "enabled": true + }, + "oss": { + "enabled": true + } + }, + "releaseStage": "alpha", + "supportLevel": "community", + "tags": [ + "language:python", + "cdk:python" + ], + "connectorTestSuitesOptions": [ + { + "suite": "unitTests" + }, + { + "suite": "integrationTests", + "testSecrets": [ + { + "name": "SECRET_DESTINATION-HF-BUCKETS__CREDS", + "fileName": "config.json", + "secretStore": { + "type": "GSM", + "alias": "airbyte-connector-testing-secret-store" + } + } + ] + } + ] +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-buckets/pyproject.toml b/airbyte-integrations/connectors/destination-hugging-face-buckets/pyproject.toml new file mode 100644 index 000000000000..dae64cc52c94 --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-buckets/pyproject.toml @@ -0,0 +1,33 @@ +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry] +version = "0.1.0" +name = "destination_hugging_face_buckets" +description = "Destination implementation for Hugging Face Buckets" +authors = ["Airbyte "] +license = "ELv2" +readme = "README.md" +documentation = "https://docs.airbyte.com/integrations/destinations/hugging-face-buckets" +homepage = "https://airbyte.com" +repository = "https://github.com/airbytehq/airbyte" +[[tool.poetry.packages]] +include = "destination_hugging_face_buckets" + +[tool.poetry.dependencies] +python = "^3.10,<3.15" +airbyte-cdk = "^7" +huggingface_hub = ">=0.20.0" +pyarrow = ">=21.0.0" +pandas = ">=2.0.0" +pyyaml = ">=6.0.0" +tqdm = ">=4.65.0" + +[tool.poetry.scripts] +destination-hugging-face-buckets = "destination_hugging_face_buckets.run:run" + +[tool.poe] +include = [ + "${POE_GIT_DIR}/poe-tasks/poetry-connector-tasks.toml", +] \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-buckets/unit_tests/test_destination.py b/airbyte-integrations/connectors/destination-hugging-face-buckets/unit_tests/test_destination.py new file mode 100644 index 000000000000..0837be6e78ce --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-buckets/unit_tests/test_destination.py @@ -0,0 +1,95 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import unittest +from unittest.mock import MagicMock, patch + +from destination_hugging_face_buckets.destination import ( + DestinationHuggingFaceBuckets, +) +from fsspec.implementations.memory import MemoryFileSystem + +from airbyte_cdk.models import Status + + +class TestDestinationHuggingFaceBucketsCheck(unittest.TestCase): + """Tests for the DestinationHuggingFaceBuckets.check method.""" + + def setUp(self): + self.destination = DestinationHuggingFaceBuckets() + + @patch("destination_hugging_face_buckets.destination.HfFileSystem") + def test_check_connection_success( + self, mock_fs + ): + """Test successful connection check.""" + mock_fs.return_value = MemoryFileSystem(skip_instance_cache=True) + + config = { + "destination_path": "hf://buckets/test/bucket/", + "file_format": "parquet", + } + + result = self.destination.check(MagicMock(), config) + self.assertEqual(result.status, Status.SUCCEEDED) + + @patch("destination_hugging_face_buckets.destination.HfFileSystem.open") + def test_check_connection_failure(self, mock_open): + """Test failed connection check.""" + mock_open.side_effect = Exception("Access denied") + + config = { + "destination_path": "hf://buckets/test/bucket/", + "file_format": "parquet", + } + + result = self.destination.check(MagicMock(), config) + self.assertEqual(result.status, Status.FAILED) + + +class TestDestinationHuggingFaceBucketsWrite(unittest.TestCase): + """Tests for the DestinationHuggingFaceBuckets.check method.""" + + def setUp(self): + self.destination = DestinationHuggingFaceBuckets() + + @patch("destination_hugging_face_buckets.destination.HfFileSystem") + def test_write( + self, mock_fs + ): + """Test successful connection check.""" + mock_fs_instance = MemoryFileSystem(skip_instance_cache=True) + mock_fs.return_value = mock_fs_instance + + # Create mock messages + from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, Type + messages = [ + AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + data={"col1": "value1", "col2": 123}, + stream="test_stream", + emitted_at=0, + ) + ), + AirbyteMessage(type=Type.STATE, state={"checkpoint": 1}), + ] + + config = { + "destination_path": "hf://buckets/test/bucket/", + "file_format": "parquet", + } + + result = list(self.destination.write(config, MagicMock(), iter(messages))) + + output_files = mock_fs_instance.glob("**/*.parquet") + self.assertEqual(len(output_files), 1) + + # Verify state message was yielded + state_messages = [m for m in result if m.type == Type.STATE] + self.assertEqual(len(state_messages), 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/airbyte-integrations/connectors/destination-hugging-face-datasets/README.md b/airbyte-integrations/connectors/destination-hugging-face-datasets/README.md new file mode 100644 index 000000000000..9c36cda40157 --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-datasets/README.md @@ -0,0 +1,47 @@ +# Destination Hugging Face Datasets + +This is an Airbyte destination connector for [Hugging Face Datasets](https://huggingface.co/docs/hub/datasets). + +## Overview + +This connector writes data from Airbyte sources to Hugging Face Datasets. It supports two modes: + +1. **Push to HF Hub**: Creates/updates datasets on Hugging Face Hub +2. **Write to HF Buckets**: Stores data in Hugging Face Buckets (file storage) + +## Configuration + +| Field | Required | Description | +|-------|----------|-------------| +| `dataset_name` | Yes | The name of the Hugging Face Dataset to write to. Format: `{username}/{dataset_name}` | +| `push_to_hub` | No | Whether to push data to Hugging Face Hub (default: `false`). If `false`, data will be written to HF Buckets. | +| `overwrite` | No | Whether to overwrite existing datasets in Hub (default: `false`) | +| `token` | No | Hugging Face API token for authentication | + +### Examples + +```json +{ + "dataset_name": "lhoestq/b", + "push_to_hub": true, + "overwrite": false, + "token": "hf_..." +} +``` + +## Supported Sync Modes + +- `append`: Append data to existing datasets +- `overwrite`: Replace existing datasets + +## How it works + +This destination connector receives records from Airbyte sources and writes them to Hugging Face Datasets. Depending on the configuration, it either: + +1. Pushes data directly to HF Hub using the `datasets` library +2. Writes data to HF Buckets as Parquet files + +## Limitations + +- Only supports Parquet format for bucket writes +- Hub pushes require proper dataset structure and authentication \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/__init__.py b/airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/__init__.py new file mode 100644 index 000000000000..70506029a246 --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/__init__.py @@ -0,0 +1,5 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +__version__ = "0.1.0" \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/destination.py b/airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/destination.py new file mode 100644 index 000000000000..d70023cd1072 --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/destination.py @@ -0,0 +1,149 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import logging +import uuid +from typing import Any, Iterable, Mapping, Optional + +import pandas as pd +from datasets import Dataset +from huggingface_hub import HfApi + +from airbyte_cdk.destinations import Destination +from airbyte_cdk.models import ( + AirbyteConnectionStatus, + AirbyteMessage, + ConfiguredAirbyteCatalog, + Status, + Type, +) + + +class DestinationHuggingFaceDatasets(Destination): + """A destination that writes data to Hugging Face Datasets. + + This destination writes data to Hugging Face Datasets using the `datasets` library's + `push_to_hub()` functionality. Records are collected and written as Parquet files + to the specified dataset on Hugging Face Hub. + """ + + def write( + self, + config: Mapping[str, Any], + configured_catalog: ConfiguredAirbyteCatalog, + input_messages: Iterable[AirbyteMessage], + ) -> Iterable[AirbyteMessage]: + """Write data to Hugging Face Datasets. + + Args: + config: Configuration dictionary for the destination. + configured_catalog: The catalog describing how to write data. + input_messages: Stream of Airbyte messages containing records. + + Returns: + Iterable of Airbyte messages, including state messages. + """ + dataset_name = config.get("dataset_name", "") + token = config.get("token", None) + + if not dataset_name: + raise ValueError("dataset_name is required") + + if not token: + raise ValueError("token is required") + + # Buffer records for each stream + stream_buffers: dict = {} + + try: + for message in input_messages: + if message.type == Type.STATE: + # Emit state messages as-is + yield message + elif message.type == Type.RECORD: + if message.record is None: + continue + + stream_name = message.record.stream or "default" + record_data = message.record.data or {} + + if stream_name not in stream_buffers: + stream_buffers[stream_name] = [] + + stream_buffers[stream_name].append(record_data) + + elif message.type == Type.TRACE: + # Forward trace messages + yield message + else: + # Ignore other message types + continue + + # Write all buffered data to Hugging Face Datasets + for stream_name, records in stream_buffers.items(): + if records: + df = pd.DataFrame(records) + self._push_to_hub(dataset_name, stream_name, df, token) + + except Exception as e: + logging.error(f"Write failed: {str(e)}") + raise + + def _push_to_hub(self, dataset_name: str, stream_name: str, df: pd.DataFrame, token: Optional[str]): + """Push data to Hugging Face Hub as a dataset. + + Args: + dataset_name: Base dataset name. + stream_name: Stream name to use as subset. + df: DataFrame containing the records. + token: Hugging Face token. + """ + + # Create dataset from dataframe + dataset = Dataset.from_pandas(df) + + try: + # Push to HF Hub - writes Parquet files under the hood + dataset.push_to_hub(dataset_name, stream_name, token=token) + logging.info(f"Successfully pushed {stream_name} to {dataset_name} with config {stream_name}") + + except Exception as e: + logging.error(f"Failed to push {stream_name} to Hub: {str(e)}") + raise + + def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + """Test the connection by checking if we can create and push to HF Datasets. + + Args: + logger: Logger for the connector. + config: The configuration dictionary. + + Returns: + AirbyteConnectionStatus indicating success or failure. + """ + try: + dataset_name = config.get("dataset_name", "") + token = config.get("token", None) + + if not dataset_name: + return AirbyteConnectionStatus( + status=Status.FAILED, + message="dataset_name is required", + ) + + # If we can create a branch then we can write to the dataset + branch = f"_airbyte_check_{uuid.uuid4().hex}" + api = HfApi(token=token) + api.create_branch(dataset_name, branch=branch, repo_type="dataset") + + # Clean up + api.delete_branch(dataset_name, branch=branch, repo_type="dataset") + + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + + except Exception as e: + return AirbyteConnectionStatus( + status=Status.FAILED, + message=f"Connection check failed: {str(e)}", + ) diff --git a/airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/run.py b/airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/run.py new file mode 100644 index 000000000000..5152912f253c --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/run.py @@ -0,0 +1,9 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from destination_hugging_face_datasets.run import run + + +if __name__ == "__main__": + run() \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/spec.json b/airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/spec.json new file mode 100644 index 000000000000..5347f3386279 --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-datasets/destination_hugging_face_datasets/spec.json @@ -0,0 +1,28 @@ +{ + "documentationUrl": "https://docs.airbyte.com/integrations/destinations/hugging-face-datasets", + "supported_destination_sync_modes": ["overwrite"], + "supportsIncremental": true, + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Destination Hugging Face Datasets", + "type": "object", + "required": ["dataset_name"], + "additionalProperties": false, + "properties": { + "dataset_name": { + "title": "Dataset Name", + "type": "string", + "description": "The name of the Hugging Face Dataset to write to. Format: {username}/{dataset_name}", + "examples": ["lhoestq/demo1", "organization/my_dataset"], + "order": 0 + }, + "token": { + "title": "Hugging Face Token", + "type": "string", + "description": "Your Hugging Face token for authentication.", + "airbyte_secret": true, + "order": 1 + } + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-datasets/icon.svg b/airbyte-integrations/connectors/destination-hugging-face-datasets/icon.svg new file mode 100644 index 000000000000..dc1cf3ffb77f --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-datasets/icon.svg @@ -0,0 +1 @@ +HuggingFace \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-datasets/main.py b/airbyte-integrations/connectors/destination-hugging-face-datasets/main.py new file mode 100644 index 000000000000..5152912f253c --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-datasets/main.py @@ -0,0 +1,9 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from destination_hugging_face_datasets.run import run + + +if __name__ == "__main__": + run() \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-datasets/metadata.yaml b/airbyte-integrations/connectors/destination-hugging-face-datasets/metadata.yaml new file mode 100644 index 000000000000..15e4e1898113 --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-datasets/metadata.yaml @@ -0,0 +1,36 @@ +{ + "documentationUrl": "https://docs.airbyte.com/integrations/destinations/hugging-face-datasets", + "connectorBuildOptions": { + "baseImage": "docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473649ecb593f56f58e5d0096a87ee0b231777e2e3e73" + }, + "connectorSubtype": "file", + "connectorType": "destination", + "definitionId": "d1c2d3e4-f5a6-7890-bcde-f12345678902", + "dockerImageTag": "0.1.0", + "dockerRepository": "airbyte/destination-hugging-face-datasets", + "documentationUrl": "https://docs.airbyte.com/integrations/destinations/hugging-face-datasets", + "githubIssueLabel": "destination-hugging-face-datasets", + "icon": "icon.svg", + "license": "ELv2", + "name": "Hugging Face Datasets (Destination)", + "registryOverrides": { + "cloud": {"enabled": true}, + "oss": {"enabled": true} + }, + "releaseStage": "alpha", + "supportLevel": "community", + "tags": ["language:python", "cdk:python"], + "connectorTestSuitesOptions": [ + {"suite": "unitTests"}, + { + "suite": "integrationTests", + "testSecrets": [ + { + "name": "SECRET_DESTINATION-HF-DATASETS__CREDS", + "fileName": "config.json", + "secretStore": {"type": "GSM", "alias": "airbyte-connector-testing-secret-store"} + } + ] + } + ] +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-datasets/pyproject.toml b/airbyte-integrations/connectors/destination-hugging-face-datasets/pyproject.toml new file mode 100644 index 000000000000..fb5d26ce26d8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-datasets/pyproject.toml @@ -0,0 +1,31 @@ +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry] +version = "0.1.0" +name = "destination_hugging_face_datasets" +description = "Destination implementation for Hugging Face Datasets" +authors = ["Airbyte "] +license = "ELv2" +readme = "README.md" +documentation = "https://docs.airbyte.com/integrations/destinations/hugging-face-datasets" +homepage = "https://airbyte.com" +repository = "https://github.com/airbytehq/airbyte" +[[tool.poetry.packages]] +include = "destination_hugging_face_datasets" + +[tool.poetry.dependencies] +python = "^3.10,<3.15" +airbyte-cdk = "^7" +datasets = ">=4.0.0" +pandas = ">=2.0.0" +pyyaml = ">=6.0.0" + +[tool.poetry.scripts] +destination-hugging-face-datasets = "destination_hugging_face_datasets.run:run" + +[tool.poe] +include = [ + "${POE_GIT_DIR}/poe-tasks/poetry-connector-tasks.toml", +] \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-datasets/unit_tests/test_destination.py b/airbyte-integrations/connectors/destination-hugging-face-datasets/unit_tests/test_destination.py new file mode 100644 index 000000000000..f67928a4e366 --- /dev/null +++ b/airbyte-integrations/connectors/destination-hugging-face-datasets/unit_tests/test_destination.py @@ -0,0 +1,111 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import unittest +from unittest.mock import MagicMock, patch + +from destination_hugging_face_datasets.destination import DestinationHuggingFaceDatasets + +from airbyte_cdk.models import Status + + +class TestDestinationHuggingFaceDatasetsCheck(unittest.TestCase): + """Tests for the DestinationHuggingFaceDatasets.check method.""" + + def setUp(self): + self.destination = DestinationHuggingFaceDatasets() + + def test_check_dataset_name_required(self): + """Test that dataset_name is required.""" + config = { + "token": "hf_test", + } + + result = self.destination.check(MagicMock(), config) + self.assertEqual(result.status, Status.FAILED) + + @patch("destination_hugging_face_datasets.destination.HfApi") + def test_check_hub_success(self, mock_api): + """Test successful Hub connection check.""" + mock_api.return_value = MagicMock() + + config = { + "dataset_name": "test/dataset", + "token": "hf_test", + } + + result = self.destination.check(MagicMock(), config) + self.assertEqual(result.status, Status.SUCCEEDED) + + @patch("destination_hugging_face_datasets.destination.HfApi") + def test_check_hub_failure(self, mock_api): + """Test failed Hub connection check.""" + mock_instance = MagicMock() + mock_api.return_value = mock_instance + mock_instance.create_branch.side_effect = Exception("Invalid token") + + config = { + "dataset_name": "test/dataset", + "token": "hf_test", + } + + result = self.destination.check(MagicMock(), config) + self.assertEqual(result.status, Status.FAILED) + + +class TestDestinationHuggingFaceDatasetsWrite(unittest.TestCase): + """Tests for the DestinationHuggingFaceDatasets.write method.""" + + def setUp(self): + self.destination = DestinationHuggingFaceDatasets() + + def test_write_requires_dataset_name(self): + """Test that dataset_name is required.""" + config = { + "push_to_hub": True, + "token": "hf_test", + } + + with self.assertRaises(ValueError) as context: + list(self.destination.write(config, MagicMock(), iter([]))) + self.assertIn("dataset_name is required", str(context.exception)) + + @patch("destination_hugging_face_datasets.destination.Dataset") + def test_write(self, mock_dataset): + """Test successful push to Hub.""" + # Mock Dataset + mock_dataset_instance = MagicMock() + mock_dataset.from_pandas.return_value = mock_dataset_instance + + # Create mock messages + from airbyte_cdk.models import AirbyteMessage, AirbyteRecordMessage, Type + messages = [ + AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + data={"col1": "value1", "col2": 123}, + stream="test_stream", + emitted_at=0, + ) + ), + AirbyteMessage(type=Type.STATE, state={"checkpoint": 1}), + ] + + config = { + "dataset_name": "test/dataset", + "token": "hf_test", + } + + result = list(self.destination.write(config, MagicMock(), iter(messages))) + + # Verify push_to_hub was called + mock_dataset_instance.push_to_hub.assert_called_once_with("test/dataset", "test_stream", token="hf_test") + + # Verify state message was yielded + state_messages = [m for m in result if m.type == Type.STATE] + self.assertEqual(len(state_messages), 1) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-buckets/README.md b/airbyte-integrations/connectors/source-hugging-face-buckets/README.md new file mode 100644 index 000000000000..783de3716725 --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-buckets/README.md @@ -0,0 +1,37 @@ +# Source Hugging Face Buckets + +This is an Airbyte source connector for [Hugging Face Buckets](https://huggingface.co/docs/hub/en/buckets). + +## Overview + +This connector reads data from Hugging Face Buckets (file storage on Hugging Face Hub). It supports multiple file formats including Parquet, CSV, and JSONL. + +## Configuration + +| Field | Required | Description | +|-------|----------|-------------| +| `bucket_path` | Yes | The path to the Hugging Face Bucket to read from. Format: `hf://buckets/{username}/{bucket}/{path}/` | +| `file_format` | No | The format of files in the bucket (default: `parquet`). Options: `parquet`, `csv`, `jsonl` | +| `reader_options` | No | JSON string with reader options (e.g., separators, encoding) | +| `token` | No | Hugging Face API token for authentication | + +### Examples + +```json +{ + "bucket_path": "hf://buckets/lhoestq/b/", + "file_format": "parquet", + "reader_options": "{}", + "token": "hf_..." +} +``` + +## How it works + +This connector discovers all files in the specified bucket path and creates a stream for each file. During the sync, it reads the files and emits records based on the configured file format. + +## Supported Formats + +- Parquet (.parquet) +- CSV (.csv) +- JSON/JSONL (.json, .jsonl) \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-buckets/icon.svg b/airbyte-integrations/connectors/source-hugging-face-buckets/icon.svg new file mode 100644 index 000000000000..dc1cf3ffb77f --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-buckets/icon.svg @@ -0,0 +1 @@ +HuggingFace \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-buckets/main.py b/airbyte-integrations/connectors/source-hugging-face-buckets/main.py new file mode 100644 index 000000000000..60cd4e605628 --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-buckets/main.py @@ -0,0 +1,9 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from source_hugging_face_buckets.run import run + + +if __name__ == "__main__": + run() \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-buckets/metadata.yaml b/airbyte-integrations/connectors/source-hugging-face-buckets/metadata.yaml new file mode 100644 index 000000000000..7e3778d15552 --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-buckets/metadata.yaml @@ -0,0 +1,48 @@ +{ + "documentationUrl": "https://docs.airbyte.com/integrations/sources/hugging-face-buckets", + "connectorBuildOptions": { + "baseImage": "docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473649ecb593f56f58e5d0096a87ee0b231777e2e3e73" + }, + "connectorSubtype": "file", + "connectorType": "source", + "definitionId": "b1c2d3e4-f5a6-7890-bcde-f12345678901", + "dockerImageTag": "0.1.0", + "dockerRepository": "airbyte/source-hugging-face-buckets", + "documentationUrl": "https://docs.airbyte.com/integrations/sources/hugging-face-buckets", + "githubIssueLabel": "source-hugging-face-buckets", + "icon": "icon.svg", + "license": "ELv2", + "name": "Hugging Face Buckets (Source)", + "registryOverrides": { + "cloud": { + "enabled": true + }, + "oss": { + "enabled": true + } + }, + "releaseStage": "alpha", + "supportLevel": "community", + "tags": [ + "language:python", + "cdk:python" + ], + "connectorTestSuitesOptions": [ + { + "suite": "unitTests" + }, + { + "suite": "integrationTests", + "testSecrets": [ + { + "name": "SECRET_SOURCE-HF-BUCKETS__CREDS", + "fileName": "config.json", + "secretStore": { + "type": "GSM", + "alias": "airbyte-connector-testing-secret-store" + } + } + ] + } + ] +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-buckets/pyproject.toml b/airbyte-integrations/connectors/source-hugging-face-buckets/pyproject.toml new file mode 100644 index 000000000000..b7360433bd69 --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-buckets/pyproject.toml @@ -0,0 +1,31 @@ +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry] +version = "0.1.0" +name = "source_hugging_face_buckets" +description = "Source implementation for Hugging Face Buckets" +authors = ["Airbyte "] +license = "ELv2" +readme = "README.md" +documentation = "https://docs.airbyte.com/integrations/sources/hugging-face-buckets" +homepage = "https://airbyte.com" +repository = "https://github.com/airbytehq/airbyte" +[[tool.poetry.packages]] +include = "source_hugging_face_buckets" + +[tool.poetry.dependencies] +python = "^3.10,<3.15" +airbyte-cdk = "^7" +huggingface_hub = ">=0.20.0" +pandas = ">=2.0.0" +pyarrow = ">=14.0.0" + +[tool.poetry.scripts] +source-hugging-face-buckets = "source_hugging_face_buckets.run:run" + +[tool.poe] +include = [ + "${POE_GIT_DIR}/poe-tasks/poetry-connector-tasks.toml", +] \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/__init__.py b/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/__init__.py new file mode 100644 index 000000000000..90c0556d88d6 --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/run.py b/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/run.py new file mode 100644 index 000000000000..7fd89e78892c --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/run.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import sys + +from airbyte_cdk.entrypoint import launch +from source_hugging_face_buckets.source import SourceHuggingFaceBuckets + + +def run(): + source = SourceHuggingFaceBuckets() + launch(source, sys.argv[1:]) \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/source.py b/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/source.py new file mode 100644 index 000000000000..a1876f515ed4 --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/source.py @@ -0,0 +1,405 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import json +import logging +import time +import traceback +from typing import Any, Iterable, Iterator, Mapping, MutableMapping + +import pandas as pd +import pyarrow.parquet as pq +from huggingface_hub import HfFileSystem + +from airbyte_cdk.models import ( + AirbyteCatalog, + AirbyteConnectionStatus, + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStream, + AirbyteStreamStatus, + ConfiguredAirbyteCatalog, + ConnectorSpecification, + FailureType, + SyncMode, + Type, +) +from airbyte_cdk.models import ( + Status as AirbyteStatus, +) +from airbyte_cdk.sources import Source +from airbyte_cdk.utils import AirbyteTracedException +from airbyte_cdk.utils.stream_status_utils import as_airbyte_message as stream_status_as_airbyte_message + + +class SourceHuggingFaceBuckets(Source): + """A source connector that reads data from Hugging Face Buckets. + + This connector reads files from Hugging Face Buckets (file storage on Hugging Face Hub) + using the huggingface_hub library's HfFileSystem. + + Supports the following file formats: + - Parquet (.parquet) + - JSON/JSONL (.json, .jsonl) + - CSV (.csv) + - Text files (.txt) + + URL Format: hf://buckets/{username}/{bucket}/{path}/{filename} + """ + + def spec(self, logger: logging.Logger) -> ConnectorSpecification: + """Returns the JSON schema for the connector configuration.""" + return ConnectorSpecification( + connectionSpecification={ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Source Hugging Face Buckets", + "type": "object", + "required": ["bucket_path"], + "additionalProperties": False, + "properties": { + "bucket_path": { + "title": "Bucket Path", + "type": "string", + "description": "The path to the Hugging Face Bucket. Format: hf://buckets/{username}/{bucket}/{path}/", + "examples": [ + "hf://buckets/lhoestq/b/", + "hf://buckets/organization/dataset_name/" + ], + "order": 0 + }, + "file_format": { + "title": "File Format", + "type": "string", + "description": "The format of files in the bucket. Used for schema inference.", + "enum": ["parquet", "csv", "json", "jsonl"], + "default": "parquet", + "order": 1 + }, + "reader_options": { + "title": "Reader Options", + "type": "string", + "description": "JSON string with reader options (e.g., separators, encoding).", + "examples": ['{"sep": ",", "encoding": "utf-8"}'], + "order": 2 + }, + "token": { + "title": "Hugging Face Token", + "type": "string", + "description": "Your Hugging Face token for authentication. Required for private buckets.", + "airbyte_secret": True, + "order": 3 + } + } + }, + documentationUrl="https://docs.airbyte.com/integrations/sources/hugging-face-buckets", + supports_incremental=False, + supported_destination_sync_modes=["overwrite"] + ) + + def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: + """Test the connection by checking if we can list files in the bucket.""" + try: + + bucket_path = config.get("bucket_path", "") + token = config.get("token", None) + + # Ensure bucket_path has no hf:// scheme + if bucket_path.startswith("hf://"): + bucket_path = bucket_path[5:] + + # Remove trailing slash for listing + if bucket_path.endswith("/"): + bucket_path = bucket_path[:-1] + + fs = HfFileSystem(token=token) + + # Try to list the bucket contents + fs.ls(bucket_path) + + return AirbyteConnectionStatus(status=AirbyteStatus.SUCCEEDED) + + except Exception as e: + return AirbyteConnectionStatus( + status=AirbyteStatus.FAILED, + message=f"Connection check failed: {str(e)}" + ) + + + def read( + self, + logger: logging.Logger, + config: Mapping[str, Any], + catalog: ConfiguredAirbyteCatalog, + state: MutableMapping[str, Any] = None, + ) -> Iterator[AirbyteMessage]: + """Read data from the bucket files.""" + try: + + bucket_path = config.get("bucket_path", "") + file_format = config.get("file_format", "parquet") + token = config.get("token", None) + reader_options_raw = config.get("reader_options", "{}") + + # Parse reader options + try: + reader_options = json.loads(reader_options_raw) if reader_options_raw else {} + except json.JSONDecodeError: + reader_options = {} + + # Get the configured streams + if not catalog or not catalog.streams: + logger.warning("No streams configured in catalog") + return + + # Extract stream names from ConfiguredAirbyteCatalog + # Each element is a ConfiguredAirbyteStream with a 'stream' attribute + configured_streams = {} + for configured_stream in catalog.streams: + stream_name = configured_stream.stream.name + configured_streams[stream_name] = configured_stream.stream + + # Ensure bucket_path has no hf:// scheme + if bucket_path.startswith("hf://"): + bucket_path = bucket_path[5:] + + # Remove trailing slash for listing + if bucket_path.endswith("/"): + bucket_path = bucket_path[:-1] + + fs = HfFileSystem(token=token) + + # List all files in the bucket + files = fs.ls(bucket_path) + + # Process only configured streams + for file_info in files: + if file_info["type"] != "file": + continue + + file_name = file_info["name"].split("/")[-1] + file_path = file_info['name'] + + # Skip hidden files + if file_name.startswith("."): + continue + + # Use the same stream name as discover + stream_name = f"{file_name}_{file_path.replace('/', '_').replace('.', '_')}" + + if stream_name not in configured_streams: + continue + + airbyte_stream = configured_streams[stream_name] + + logger.info(f"Reading stream: {stream_name} from hf://{file_path}") + + yield stream_status_as_airbyte_message(airbyte_stream, AirbyteStreamStatus.STARTED) + + # Read the file and emit records + record_count = 0 + try: + for record in self._read_file(file_path, file_format, reader_options, logger, fs): + yield AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + stream=stream_name, + data=record, + emitted_at=int(time.time() * 1000) # Use logger creation time + ) + ) + record_count += 1 + + if record_count == 1: + logger.info(f"Marking stream {stream_name} as RUNNING") + yield stream_status_as_airbyte_message(airbyte_stream, AirbyteStreamStatus.RUNNING) + + except Exception as e: + logger.error(f"Failed to read {stream_name}: {str(e)}") + logger.error(traceback.format_exc()) + yield stream_status_as_airbyte_message(airbyte_stream, AirbyteStreamStatus.INCOMPLETE) + raise AirbyteTracedException( + message=f"Failed to read {stream_name}: {str(e)}", + internal_message=str(e), + failure_type=FailureType.read_error + ) + + logger.info(f"Read {record_count} records from {stream_name}") + logger.info(f"Marking stream {stream_name} as COMPLETE") + yield stream_status_as_airbyte_message(airbyte_stream, AirbyteStreamStatus.COMPLETE) + + except Exception as e: + logger.error(f"Read operation failed: {str(e)}") + logger.error(traceback.format_exc()) + raise + + def _read_file(self, file_path: str, file_format: str, reader_options: dict, logger: logging.Logger, fs: HfFileSystem) -> Iterable[Mapping[str, Any]]: + """Read records from a file.""" + try: + with fs.open(file_path, "rb") as f: + if file_format == "parquet": + df = pq.read_table(f).to_pandas() + elif file_format == "csv": + df = pd.read_csv(f, **reader_options) + elif file_format in ["json", "jsonl"]: + if file_format == "jsonl": + records = [json.loads(line) for line in f.readlines()] + for record in records: + if isinstance(record, dict): + yield record + else: + yield {"value": record} + else: + records = json.load(f) + + if isinstance(records, list): + for record in records: + if isinstance(record, dict): + yield record + else: + yield {"value": record} + elif isinstance(records, dict): + yield records + else: + yield {"value": records} + return + else: + # Default to reading as text + text = f.read().decode("utf-8", errors="ignore") + yield {"content": text} + return + + # Yield DataFrame records + for _, row in df.iterrows(): + record = row.to_dict() + # Handle NaN values + for k, v in record.items(): + if pd.isna(v): + record[k] = None + yield record + + except Exception as e: + logger.error(f"Failed to read hf://{file_path}: {str(e)}") + raise + + def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: + """Discover available streams (files) in the bucket.""" + try: + bucket_path = config.get("bucket_path", "") + file_format = config.get("file_format", "parquet") + token = config.get("token", None) + + # Ensure bucket_path has no hf:// scheme + if bucket_path.startswith("hf://"): + bucket_path = bucket_path[5:] + + # Remove trailing slash + if bucket_path.endswith("/"): + bucket_path = bucket_path[:-1] + + fs = HfFileSystem(token=token) + + # List all files in the bucket + files = fs.ls(bucket_path, detail=True) + + streams = [] + for file_info in files: + if file_info.get("type") == "file": + file_path = file_info.get("name", "") + # Extract filename without path + file_name = file_path.split("/")[-1] + stream_name = f"{file_name}_{file_path.replace('/', '_').replace('.', '_')}" + + # Infer schema from file + try: + schema = self._infer_schema(file_name, file_format, logger, fs) + streams.append(AirbyteStream( + name=stream_name, + json_schema=schema, + supported_sync_modes=[SyncMode.full_refresh] + )) + except Exception as e: + logger.warning(f"Failed to infer schema for {file_name}: {str(e)}") + # Create a basic schema + streams.append(AirbyteStream( + name=stream_name, + json_schema={ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "_airbyte_data": {"type": ["string", "null"]} + } + }, + supported_sync_modes=[SyncMode.full_refresh] + )) + + return AirbyteCatalog(streams=streams) + + except Exception as e: + logger.error(f"Failed to discover streams: {str(e)}") + raise + + def _infer_schema(self, file_path: str, file_format: str, logger: logging.Logger, fs: HfFileSystem) -> dict: + """Infer schema from a file.""" + try: + if file_format == "parquet": + with fs.open(file_path, "rb") as f: + schema = pq.read_schema(f) + properties = {} + for col in schema: + dtype = str(schema.field(col).type) + if "int" in dtype: + properties[col] = {"type": ["integer", "null"]} + elif "float" in dtype or "double" in dtype: + properties[col] = {"type": ["number", "null"]} + elif "bool" in dtype: + properties[col] = {"type": ["boolean", "null"]} + else: + properties[col] = {"type": ["string", "null"]} + + return { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": properties + } + + elif file_format == "csv": + with fs.open(file_path, "rb") as f: + df = pd.read_csv(f, nrows=10) + properties = {} + for col in df.columns: + dtype = str(df[col].dtype) + if "int" in dtype: + properties[col] = {"type": ["integer", "null"]} + elif "float" in dtype: + properties[col] = {"type": ["number", "null"]} + elif "bool" in dtype: + properties[col] = {"type": ["boolean", "null"]} + else: + properties[col] = {"type": ["string", "null"]} + + return { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": properties + } + + else: + # For JSON/JSONL, return a generic schema + return { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "_airbyte_data": {"type": ["string", "null"]} + } + } + + except Exception as e: + logger.warning(f"Failed to infer schema for {file_path}: {str(e)}") + return { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "_airbyte_data": {"type": ["string", "null"]} + } + } \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/spec.json b/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/spec.json new file mode 100644 index 000000000000..10d7d27d60a0 --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/spec.json @@ -0,0 +1,46 @@ +{ + "documentationUrl": "https://docs.airbyte.com/integrations/sources/hugging-face-buckets", + "supported_destination_sync_modes": ["append", "overwrite"], + "supportsIncremental": false, + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Source Hugging Face Buckets", + "type": "object", + "required": ["bucket_path"], + "additionalProperties": false, + "properties": { + "bucket_path": { + "title": "Bucket Path", + "type": "string", + "description": "The path to the Hugging Face Bucket to read from. Format: hf://buckets/{username}/{bucket}/{path}/", + "examples": [ + "hf://buckets/lhoestq/b/", + "hf://buckets/organization/dataset_name/" + ], + "order": 0 + }, + "file_format": { + "title": "File Format", + "type": "string", + "description": "The format of files in the bucket. Used for schema inference.", + "enum": ["parquet", "csv", "json", "jsonl"], + "default": "parquet", + "order": 1 + }, + "reader_options": { + "title": "Reader Options", + "type": "string", + "description": "JSON string with reader options (e.g., separators, encoding).", + "examples": ["{\"sep\": \",\", \"encoding\": \"utf-8\"}"], + "order": 2 + }, + "token": { + "title": "Hugging Face Token", + "type": "string", + "description": "Your Hugging Face token for authentication. Required for private buckets.", + "airbyte_secret": true, + "order": 3 + } + } + } +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-buckets/unit_tests/test_source.py b/airbyte-integrations/connectors/source-hugging-face-buckets/unit_tests/test_source.py new file mode 100644 index 000000000000..38b067ba7013 --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-buckets/unit_tests/test_source.py @@ -0,0 +1,86 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import json +import unittest +from unittest.mock import MagicMock, patch + +from fsspec.implementations.memory import MemoryFileSystem +from source_hugging_face_buckets.source import SourceHuggingFaceBuckets + +from airbyte_cdk.models import ConfiguredAirbyteCatalog, ConfiguredAirbyteStream, Status, Type + + +class TestSourceHuggingFaceBucketsCheck(unittest.TestCase): + """Tests for the SourceHuggingFaceBuckets class.""" + + def setUp(self): + self.source = SourceHuggingFaceBuckets() + + @patch("source_hugging_face_buckets.source.HfFileSystem") + def test_check_connection_success(self, mock_fs): + """Test successful connection check.""" + mock_fs_instance = MagicMock() + mock_fs_instance.ls.return_value = [{"name": "test/file.parquet", "type": "file"}] + mock_fs.return_value = mock_fs_instance + + config = { + "bucket_path": "hf://buckets/test/bucket/", + "file_format": "parquet", + } + + result = self.source.check(MagicMock(), config) + self.assertEqual(result.status, Status.SUCCEEDED) + + @patch("source_hugging_face_buckets.source.HfFileSystem") + def test_check_connection_failure(self, mock_fs): + """Test failed connection check.""" + mock_fs_instance = MagicMock() + mock_fs_instance.ls.side_effect = Exception("Access denied") + mock_fs.return_value = mock_fs_instance + + config = { + "bucket_path": "hf://buckets/test/bucket/", + "file_format": "parquet", + } + + result = self.source.check(MagicMock(), config) + self.assertEqual(result.status, Status.FAILED) + + +class TestSourceHuggingFaceBucketsRead(unittest.TestCase): + """Tests for the SourceHuggingFaceBuckets class.""" + + def setUp(self): + self.source = SourceHuggingFaceBuckets() + + @patch("source_hugging_face_buckets.source.HfFileSystem") + def test_read(self, mock_fs): + """Test read.""" + mock_fs_instance = MemoryFileSystem(skip_instance_cache=True) + with mock_fs_instance.open("buckets/test/bucket/file.jsonl", "w") as f: + f.write(json.dumps({"col1": "value1", "col2": 123})) + mock_fs.return_value = mock_fs_instance + + config = { + "bucket_path": "hf://buckets/test/bucket/", + "file_format": "jsonl", + } + + catalog = self.source.discover(MagicMock(), config) + configured_catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream(stream=stream, sync_mode="full_refresh", destination_sync_mode="overwrite") + for stream in catalog.streams + ] + ) + result = list(self.source.read(MagicMock(), config, configured_catalog)) + + record_messages = [m for m in result if m.type == Type.RECORD] + self.assertEqual(len(record_messages), 1) + self.assertDictEqual(record_messages[0].record.data, {"col1": "value1", "col2": 123}) + + +if __name__ == "__main__": + unittest.main() diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/icon.svg b/airbyte-integrations/connectors/source-hugging-face-datasets/icon.svg index 43c5d3c0c97a..dc1cf3ffb77f 100644 --- a/airbyte-integrations/connectors/source-hugging-face-datasets/icon.svg +++ b/airbyte-integrations/connectors/source-hugging-face-datasets/icon.svg @@ -1,37 +1 @@ - - - - - - - - - - - +HuggingFace \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/main.py b/airbyte-integrations/connectors/source-hugging-face-datasets/main.py new file mode 100644 index 000000000000..3c2e2d88d2c4 --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-datasets/main.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python3 +"""Entry point for the Hugging Face Datasets source connector.""" + +from source_hugging_face_datasets.run import main + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/metadata.yaml b/airbyte-integrations/connectors/source-hugging-face-datasets/metadata.yaml index 0dd4c1e11546..bfb2445bf221 100644 --- a/airbyte-integrations/connectors/source-hugging-face-datasets/metadata.yaml +++ b/airbyte-integrations/connectors/source-hugging-face-datasets/metadata.yaml @@ -2,7 +2,8 @@ metadataSpecVersion: "1.0" data: allowedHosts: hosts: - - "datasets-server.huggingface.co" + - "huggingface.co" + - "huggingface.com" registryOverrides: oss: enabled: true @@ -10,14 +11,14 @@ data: enabled: true remoteRegistries: pypi: - enabled: false + enabled: true packageName: airbyte-source-hugging-face-datasets connectorBuildOptions: - baseImage: docker.io/airbyte/source-declarative-manifest:7.23.4@sha256:f63c39a8a6f28ec20ada2ddf5861f3b1fb7b6a1ef1a8a8d1d537ff684ee3f9bd - connectorSubtype: api + baseImage: docker.io/airbyte/source-python:7.23.4 + connectorSubtype: python connectorType: source definitionId: 38438040-03d9-406d-b10b-af83beadd3ef - dockerImageTag: 0.0.54 + dockerImageTag: 0.1.0 dockerRepository: airbyte/source-hugging-face-datasets githubIssueLabel: source-hugging-face-datasets icon: icon.svg @@ -28,12 +29,11 @@ data: supportLevel: community documentationUrl: https://docs.airbyte.com/integrations/sources/hugging-face-datasets tags: - - language:manifest-only - - cdk:low-code + - cdk:python ab_internal: ql: 100 sl: 100 externalDocumentationUrls: - title: Hugging Face Datasets documentation url: https://huggingface.co/docs/datasets/ - type: api_reference + type: api_reference \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/pyproject.toml b/airbyte-integrations/connectors/source-hugging-face-datasets/pyproject.toml new file mode 100644 index 000000000000..f11b24d291b1 --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-datasets/pyproject.toml @@ -0,0 +1,16 @@ +[project] +name = "airbyte-source-hugging-face-datasets" +version = "0.0.1" +description = "Airbyte Source for Hugging Face Datasets using the datasets library" +readme = "README.md" +requires-python = ">=3.9,<3.13" +license = {text = "ELv2"} + +dependencies = [ + "airbyte-cdk>=7.0,<8.0", + "datasets>=4.0.0", + "pandas>=2.0.0", +] + +[project.scripts] +airbyte-source-hugging-face-datasets = "source_hugging_face_datasets.run:main" \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/__init__.py b/airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/__init__.py new file mode 100644 index 000000000000..b58ee991736a --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/__init__.py @@ -0,0 +1,3 @@ +"""Hugging Face Datasets source connector package.""" + +__version__ = "0.0.1" \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/run.py b/airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/run.py new file mode 100644 index 000000000000..7fa7202f5edd --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/run.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 +"""Run function for the Hugging Face Datasets source connector.""" + +import sys + +from airbyte_cdk.entrypoint import launch +from source_hugging_face_datasets.source import SourceHuggingFaceDatasets + + +def main() -> int: + """Run the connector.""" + source = SourceHuggingFaceDatasets() + launch(source, sys.argv[1:]) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/source.py b/airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/source.py new file mode 100644 index 000000000000..71309fe97c4c --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/source.py @@ -0,0 +1,316 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + +"""Hugging Face Datasets Source implementation.""" + +import json +import logging +import time +import traceback +from typing import Any, Iterator, Mapping, MutableMapping + +from airbyte_cdk.models import ( + AirbyteCatalog, + AirbyteConnectionStatus, + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStream, + AirbyteStreamStatus, + ConfiguredAirbyteCatalog, + ConnectorSpecification, + FailureType, + Status, + Type, +) +from airbyte_cdk.sources import Source +from airbyte_cdk.utils import AirbyteTracedException +from airbyte_cdk.utils.stream_status_utils import ( + as_airbyte_message as stream_status_as_airbyte_message, +) + + +class SourceHuggingFaceDatasets(Source): + """Source for reading data from Hugging Face Datasets using the datasets library.""" + + def spec( + self, logger: logging.Logger + ) -> ConnectorSpecification: + """Returns the connector specification.""" + spec_path = __file__.replace("source.py", "spec.json") + if spec_path: + import os + if os.path.exists(spec_path): + with open(spec_path) as f: + spec_dict = json.load(f) + # Extract the first item from the list (Airbyte spec format) + conn_spec = spec_dict["connectionSpecification"] + if isinstance(conn_spec, list): + conn_spec = conn_spec[0] if conn_spec else {} + + # Add the streaming field if not present + if conn_spec.get("type") == "object": + properties = conn_spec.get("properties", {}) + if "streaming" not in properties: + properties["streaming"] = { + "order": 4, + "title": "Streaming Mode", + "type": "boolean", + "default": False, + "description": "When true, datasets are streamed on-the-fly without caching to disk. Use this for very large datasets where you don't want to fill disk space. Note: streaming mode is slower and less reliable than non-streaming mode.", + } + conn_spec["properties"] = properties + + return ConnectorSpecification( + connectionSpecification=conn_spec, + documentationUrl=spec_dict.get("documentationUrl", ""), + ) + return ConnectorSpecification( + connectionSpecification={ + "title": "Dataset Configuration", + "properties": { + "dataset_name": { + "order": 0, + "title": "Dataset Name", + "type": "string", + "description": "The name of the dataset on Hugging Face (e.g., 'glue', 'squad', 'imdb')", + }, + "dataset_subsets": { + "order": 1, + "title": "Dataset Subsets/Configurations", + "type": "array", + "description": "List of dataset subsets (configs) to import. If empty, all subsets will be imported.", + "items": {"type": "string"}, + }, + "dataset_splits": { + "order": 2, + "title": "Dataset Splits", + "type": "array", + "description": "List of dataset splits to import. If empty, all splits will be imported. Common values: 'train', 'test', 'validation'.", + "items": {"type": "string"}, + }, + "token": { + "order": 3, + "title": "HF Token (Optional)", + "type": "string", + "description": "Hugging Face token for private datasets.", + }, + "streaming": { + "order": 4, + "title": "Streaming Mode", + "type": "boolean", + "default": False, + "description": "When true, datasets are streamed on-the-fly without caching to disk. Use this for very large datasets where you don't want to fill disk space. Note: streaming mode is slower and less reliable than non-streaming mode.", + }, + }, + "required": ["dataset_name"], + "type": "object", + }, + documentationUrl="https://docs.airbyte.com/integrations/sources/hugging-face-datasets", + ) + + def check( + self, logger: logging.Logger, config: Mapping[str, Any] + ) -> AirbyteConnectionStatus: + """Check connection by attempting to list dataset configs.""" + try: + import datasets + + dataset_name = config.get("dataset_name", "") + token = config.get("token", None) + + # Use get_dataset_config_names to verify access without downloading data + config_names = datasets.get_dataset_config_names(dataset_name, token=token) + logger.info(f"Connection check successful for dataset: {dataset_name} ({len(config_names)} configs)") + + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + + except Exception as err: + reason = f"Connection check failed: {str(err)}" + logger.error(f"{reason}\n{traceback.format_exc()}") + return AirbyteConnectionStatus( + status=Status.FAILED, + message=reason, + ) + + def discover( + self, logger: logging.Logger, config: Mapping[str, Any] + ) -> AirbyteCatalog: + """Discover available streams from the dataset.""" + import datasets + + dataset_name = config.get("dataset_name", "") + token = config.get("token", None) + dataset_subsets = config.get("dataset_subsets", []) + dataset_splits = config.get("dataset_splits", []) + + streams = [] + + try: + # Get available configs (subsets) without downloading data + all_configs = datasets.get_dataset_config_names(dataset_name, token=token) + + # Filter configs if specified + if dataset_subsets: + configs_to_load = [c for c in dataset_subsets if c in all_configs] + if not configs_to_load: + configs_to_load = all_configs + else: + configs_to_load = all_configs + + # For each config, discover splits and create streams + for config_name in configs_to_load: + try: + # Get available splits for this config + all_splits = datasets.get_dataset_split_names( + dataset_name, + config_name=config_name, + token=token + ) + + # Filter splits if specified + if dataset_splits: + splits_to_load = [s for s in dataset_splits if s in all_splits] + if not splits_to_load: + splits_to_load = all_splits + else: + splits_to_load = all_splits + + for split_name in splits_to_load: + # Create stream name: dataset_name__config__split + stream_name = f"{dataset_name}__{config_name}__{split_name}" + + # Use a dynamic schema - actual schema discovered during read + streams.append( + AirbyteStream( + name=stream_name, + json_schema={"type": "object", "additionalProperties": True}, + supported_sync_modes=["full_refresh"], + ) + ) + + except Exception as inner_err: + logger.warning( + f"Failed to discover config {config_name}: {str(inner_err)}" + ) + + logger.info(f"Discovered {len(streams)} streams from dataset {dataset_name}") + + except Exception as err: + reason = f"Failed to discover dataset: {str(err)}" + logger.error(f"{reason}\n{traceback.format_exc()}") + raise AirbyteTracedException( + message=reason, + internal_message=reason, + failure_type=FailureType.config_error, + ) + + return AirbyteCatalog(streams=streams) + + def read( + self, + logger: logging.Logger, + config: Mapping[str, Any], + catalog: ConfiguredAirbyteCatalog, + state: MutableMapping[str, Any] = None, + ) -> Iterator[AirbyteMessage]: + """Read data from Hugging Face datasets using the datasets library.""" + import datasets + + dataset_name = config.get("dataset_name", "") + token = config.get("token", None) + + logger.info(f"Starting read for dataset: {dataset_name}") + + try: + for configured_stream in catalog.streams: + airbyte_stream = configured_stream.stream + stream_name = airbyte_stream.name + + # Parse the stream name to get config and split + # Format: dataset_name__config_name__split_name + parts = stream_name.split("__") + if len(parts) < 3: + logger.warning(f"Invalid stream name format: {stream_name}, skipping") + continue + + config_name = parts[1] + split_name = parts[2] + + yield stream_status_as_airbyte_message( + airbyte_stream, AirbyteStreamStatus.STARTED + ) + + logger.info(f"Syncing stream: {stream_name} (config={config_name}, split={split_name})") + + # Load the dataset using the datasets library + try: + streaming = config.get("streaming", False) + logger.info( + f"Loading dataset {dataset_name} config={config_name} split={split_name} (streaming={streaming})" + ) + dataset = datasets.load_dataset( + dataset_name, + config_name, + split=split_name, + token=token, + streaming=streaming, + ) + # In streaming mode, wrap in list to get an indexable dataset + # (streaming returns IterableDataset which can't be len() or indexed, + # but iterating works the same) + if streaming: + logger.info("Using streaming mode - iterating directly from source") + except Exception as load_err: + logger.error( + f"Failed to load dataset {dataset_name} config={config_name} split={split_name}: {load_err}" + ) + yield stream_status_as_airbyte_message( + airbyte_stream, AirbyteStreamStatus.INCOMPLETE + ) + continue + + # Read records + record_count = 0 + try: + for record in dataset: + yield AirbyteMessage( + type=Type.RECORD, + record=AirbyteRecordMessage( + stream=stream_name, + data=record, + emitted_at=int(time.time() * 1000), + ), + ) + record_count += 1 + + if record_count == 1: + logger.info(f"Marking stream {stream_name} as RUNNING") + yield stream_status_as_airbyte_message( + airbyte_stream, AirbyteStreamStatus.RUNNING + ) + + logger.info( + f"Successfully read {record_count} records from {stream_name}" + ) + + except Exception as read_err: + logger.error( + f"Failed to read data from {stream_name}: {str(read_err)}\n{traceback.format_exc()}" + ) + yield stream_status_as_airbyte_message( + airbyte_stream, AirbyteStreamStatus.INCOMPLETE + ) + raise AirbyteTracedException( + message=f"Failed to read {stream_name}: {str(read_err)}", + internal_message=str(read_err), + failure_type=FailureType.retriable_error, + ) + + yield stream_status_as_airbyte_message( + airbyte_stream, AirbyteStreamStatus.COMPLETE + ) + + except Exception as err: + logger.error(f"Read failed: {str(err)}\n{traceback.format_exc()}") + raise \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/spec.json b/airbyte-integrations/connectors/source-hugging-face-datasets/spec.json new file mode 100644 index 000000000000..9978658c508a --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-datasets/spec.json @@ -0,0 +1,53 @@ +{ + "documentationUrl": "https://docs.airbyte.com/integrations/sources/hugging-face-datasets", + "connectionSpecification": [ + { + "title": "Dataset Configuration", + "properties": { + "dataset_name": { + "order": 0, + "title": "Dataset Name", + "type": "string", + "description": "The name of the dataset on Hugging Face (e.g., 'nyu-mll/glue', 'rajpurkar/squad', 'stanfordnlp/imdb')" + }, + "dataset_subsets": { + "order": 1, + "title": "Dataset Subsets/Configurations", + "type": "array", + "description": "List of dataset subsets (configs) to import. If empty, all subsets will be imported. See https://huggingface.co/docs/datasets/v2.20.0/en/load_view#configurations", + "items": { + "type": "string" + } + }, + "dataset_splits": { + "order": 2, + "title": "Dataset Splits", + "type": "array", + "description": "List of dataset splits to import. If empty, all splits will be imported. Common values: 'train', 'test', 'validation'. See https://huggingface.co/docs/datasets/v2.20.0/en/load_view#splits", + "items": { + "type": "string" + } + }, + "token": { + "order": 3, + "title": "HF Token (Optional)", + "type": "string", + "description": "Hugging Face token for private datasets. If not provided, public datasets will be used." + }, + "streaming": { + "order": 4, + "title": "Streaming Mode", + "type": "boolean", + "default": false, + "description": "When true, datasets are streamed on-the-fly without caching to disk. Use this for very large datasets where you don't want to fill disk space." + } + }, + "required": ["dataset_name"], + "type": "object" + } + ], + "supportsNormalization": false, + "supportsDBTReplication": false, + "supportsCDC": false, + "supportsIDBasedIncrementalSync": false +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/unit_tests/test_source.py b/airbyte-integrations/connectors/source-hugging-face-datasets/unit_tests/test_source.py new file mode 100644 index 000000000000..36d04a22f221 --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-datasets/unit_tests/test_source.py @@ -0,0 +1,121 @@ +"""Unit tests for the Hugging Face Datasets source connector.""" + +import logging +from unittest.mock import patch + +import pytest +from source_hugging_face_datasets.source import SourceHuggingFaceDatasets + +from airbyte_cdk.models import ( + AirbyteStream, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + Status, + SyncMode, + Type, +) + + +@pytest.fixture +def source(): + """Create a source instance.""" + return SourceHuggingFaceDatasets() + + +@pytest.fixture +def config(): + """Create a test configuration.""" + return { + "dataset_name": "squad", + "dataset_subsets": [], + "dataset_splits": [], + "token": None, + } + + +@pytest.fixture +def logger(): + """Create a logger.""" + logger = logging.getLogger(__name__) + logger.setLevel(logging.WARNING) + for handler in logger.handlers[:]: + logger.removeHandler(handler) + return logger + + +class TestSourceHuggingFaceDatasetsCheck: + """Test the check connection method.""" + + def test_check_connection_success(self, source, config, logger): + """Test successful connection check.""" + with patch("datasets.get_dataset_config_names") as mock_get_configs: + mock_get_configs.return_value = ["default"] + result = source.check(logger, config) + assert result.status == Status.SUCCEEDED + + def test_check_connection_failure(self, source, config, logger): + """Test failed connection check.""" + with patch("datasets.get_dataset_config_names") as mock_get_configs: + mock_get_configs.side_effect = Exception("Dataset not found") + result = source.check(logger, config) + assert result.status == Status.FAILED + + +class TestSourceHuggingFaceDatasetsDiscover: + """Test the discover method.""" + + def test_discover_creates_streams(self, source, config, logger): + """Test that discover creates streams for each config and split.""" + with patch("datasets.get_dataset_config_names") as mock_get_configs, \ + patch("datasets.get_dataset_split_names") as mock_get_splits: + + mock_get_configs.return_value = ["default"] + mock_get_splits.return_value = ["train", "test"] + + catalog = source.discover(logger, config) + + assert len(catalog.streams) == 2 + assert any("squad" in s.name for s in catalog.streams) + + +class TestSourceHuggingFaceDatasetsRead: + """Test the read method.""" + + def test_read(self, source, config, logger): + """Test that read returns records from the dataset.""" + # Mock the dataset + mock_dataset = [ + {"id": "1", "context": "Some context"}, + {"id": "2", "context": "Another context"}, + ] + + with patch("datasets.get_dataset_config_names") as mock_get_configs, \ + patch("datasets.get_dataset_split_names") as mock_get_splits, \ + patch("datasets.load_dataset") as mock_load_dataset: + + mock_get_configs.return_value = ["default"] + mock_get_splits.return_value = ["train"] + mock_load_dataset.return_value = mock_dataset + + # Create a simple catalog with required sync_mode and destination_sync_mode + stream = AirbyteStream( + name="squad__default__train", + json_schema={"type": "object", "additionalProperties": True}, + supported_sync_modes=[SyncMode.full_refresh], + ) + catalog = ConfiguredAirbyteCatalog( + streams=[ConfiguredAirbyteStream( + stream=stream, + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.append, + )] + ) + + messages = list(source.read(logger, config, catalog)) + records = [m for m in messages if m.type == Type.RECORD] + assert len(records) == 2 + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) From f5b2d8d8d9bebac064e250d3fed51c63fdb58482 Mon Sep 17 00:00:00 2001 From: Quentin Lhoest Date: Wed, 1 Jul 2026 17:37:04 +0200 Subject: [PATCH 2/2] minor changes --- .../README.md | 5 +- .../metadata.yaml | 72 ++---- .../README.md | 20 +- .../metadata.yaml | 60 ++--- .../source-hugging-face-buckets/metadata.yaml | 72 ++---- .../source_hugging_face_buckets/source.py | 2 +- .../source-hugging-face-datasets/README.md | 39 ++- .../acceptance-test-config.yml | 17 -- .../manifest.yaml | 223 ------------------ .../metadata.yaml | 63 ++--- .../source_hugging_face_datasets/source.py | 30 +-- .../source-hugging-face-datasets/spec.json | 10 +- 12 files changed, 146 insertions(+), 467 deletions(-) mode change 120000 => 100644 airbyte-integrations/connectors/source-hugging-face-datasets/README.md delete mode 100644 airbyte-integrations/connectors/source-hugging-face-datasets/acceptance-test-config.yml delete mode 100644 airbyte-integrations/connectors/source-hugging-face-datasets/manifest.yaml diff --git a/airbyte-integrations/connectors/destination-hugging-face-buckets/README.md b/airbyte-integrations/connectors/destination-hugging-face-buckets/README.md index 7433524bb58d..133d63163ce1 100644 --- a/airbyte-integrations/connectors/destination-hugging-face-buckets/README.md +++ b/airbyte-integrations/connectors/destination-hugging-face-buckets/README.md @@ -1,6 +1,6 @@ # Destination Hugging Face Buckets -This is an Airbyte destination connector for [Hugging Face Buckets](https://huggingface.co/docs/hub/en/buckets). +This is an Airbyte destination connector for [Hugging Face Buckets](https://huggingface.co/storage). ## Overview @@ -12,7 +12,6 @@ This connector writes data from Airbyte sources to Hugging Face Buckets in Parqu |-------|----------|-------------| | `destination_path` | Yes | The path to the Hugging Face Bucket where data will be written. Format: `hf://buckets/{username}/{bucket}/{path}/` or `hf://username/bucket/path/` | | `file_format` | No | The format to use when writing files (default: `parquet`). Options: `parquet`, `jsonl` | -| `overwrite` | No | Whether to overwrite existing files (default: `false`) | | `token` | No | Hugging Face API token for authentication | ### Examples @@ -21,14 +20,12 @@ This connector writes data from Airbyte sources to Hugging Face Buckets in Parqu { "destination_path": "hf://buckets/lhoestq/b/", "file_format": "parquet", - "overwrite": true, "token": "hf_..." } ``` ## Supported Sync Modes -- `append`: Append data to existing files - `overwrite`: Overwrite existing files ## Supported Formats diff --git a/airbyte-integrations/connectors/destination-hugging-face-buckets/metadata.yaml b/airbyte-integrations/connectors/destination-hugging-face-buckets/metadata.yaml index 5818ff6ba4f6..2cd80d7e5dc0 100644 --- a/airbyte-integrations/connectors/destination-hugging-face-buckets/metadata.yaml +++ b/airbyte-integrations/connectors/destination-hugging-face-buckets/metadata.yaml @@ -1,48 +1,24 @@ -{ - "documentationUrl": "https://docs.airbyte.com/integrations/destinations/hugging-face-buckets", - "connectorBuildOptions": { - "baseImage": "docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473649ecb593f56f58e5d0096a87ee0b231777e2e3e73" - }, - "connectorSubtype": "file", - "connectorType": "destination", - "definitionId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", - "dockerImageTag": "0.1.0", - "dockerRepository": "airbyte/destination-hugging-face-buckets", - "documentationUrl": "https://docs.airbyte.com/integrations/destinations/hugging-face-buckets", - "githubIssueLabel": "destination-hugging-face-buckets", - "icon": "icon.svg", - "license": "ELv2", - "name": "Hugging Face Buckets", - "registryOverrides": { - "cloud": { - "enabled": true - }, - "oss": { - "enabled": true - } - }, - "releaseStage": "alpha", - "supportLevel": "community", - "tags": [ - "language:python", - "cdk:python" - ], - "connectorTestSuitesOptions": [ - { - "suite": "unitTests" - }, - { - "suite": "integrationTests", - "testSecrets": [ - { - "name": "SECRET_DESTINATION-HF-BUCKETS__CREDS", - "fileName": "config.json", - "secretStore": { - "type": "GSM", - "alias": "airbyte-connector-testing-secret-store" - } - } - ] - } - ] -} \ No newline at end of file +documentationUrl: https://docs.airbyte.com/integrations/destinations/hugging-face-buckets +connectorBuildOptions: + baseImage: docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473649ecb593f56f58e5d0096a87ee0b231777e2e3e73 +connectorSubtype: file +connectorType: destination +definitionId: a1b2c3d4-e5f6-7890-abcd-ef1234567890 +dockerImageTag: "0.1.0" +dockerRepository: airbyte/destination-hugging-face-buckets +githubIssueLabel: destination-hugging-face-buckets +icon: icon.svg +license: ELv2 +name: Hugging Face Buckets +registryOverrides: + cloud: + enabled: true + oss: + enabled: true +releaseStage: alpha +supportLevel: community +tags: + - language:python + - "cdK:python" +connectorTestSuitesOptions: + - suite: unitTests \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-hugging-face-datasets/README.md b/airbyte-integrations/connectors/destination-hugging-face-datasets/README.md index 9c36cda40157..1ad536573983 100644 --- a/airbyte-integrations/connectors/destination-hugging-face-datasets/README.md +++ b/airbyte-integrations/connectors/destination-hugging-face-datasets/README.md @@ -4,44 +4,32 @@ This is an Airbyte destination connector for [Hugging Face Datasets](https://hug ## Overview -This connector writes data from Airbyte sources to Hugging Face Datasets. It supports two modes: - -1. **Push to HF Hub**: Creates/updates datasets on Hugging Face Hub -2. **Write to HF Buckets**: Stores data in Hugging Face Buckets (file storage) +This connector writes data from Airbyte sources to Hugging Face Datasets. ## Configuration | Field | Required | Description | |-------|----------|-------------| | `dataset_name` | Yes | The name of the Hugging Face Dataset to write to. Format: `{username}/{dataset_name}` | -| `push_to_hub` | No | Whether to push data to Hugging Face Hub (default: `false`). If `false`, data will be written to HF Buckets. | -| `overwrite` | No | Whether to overwrite existing datasets in Hub (default: `false`) | | `token` | No | Hugging Face API token for authentication | ### Examples ```json { - "dataset_name": "lhoestq/b", - "push_to_hub": true, - "overwrite": false, + "dataset_name": "lhoestq/demo1", "token": "hf_..." } ``` ## Supported Sync Modes -- `append`: Append data to existing datasets - `overwrite`: Replace existing datasets ## How it works -This destination connector receives records from Airbyte sources and writes them to Hugging Face Datasets. Depending on the configuration, it either: - -1. Pushes data directly to HF Hub using the `datasets` library -2. Writes data to HF Buckets as Parquet files +This destination connector receives records from Airbyte sources and writes them to Hugging Face Datasets. It pushes data directly to HF Hub using the `datasets` library. ## Limitations -- Only supports Parquet format for bucket writes -- Hub pushes require proper dataset structure and authentication \ No newline at end of file +- Only supports Parquet format diff --git a/airbyte-integrations/connectors/destination-hugging-face-datasets/metadata.yaml b/airbyte-integrations/connectors/destination-hugging-face-datasets/metadata.yaml index 15e4e1898113..811b61d25b1d 100644 --- a/airbyte-integrations/connectors/destination-hugging-face-datasets/metadata.yaml +++ b/airbyte-integrations/connectors/destination-hugging-face-datasets/metadata.yaml @@ -1,36 +1,24 @@ -{ - "documentationUrl": "https://docs.airbyte.com/integrations/destinations/hugging-face-datasets", - "connectorBuildOptions": { - "baseImage": "docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473649ecb593f56f58e5d0096a87ee0b231777e2e3e73" - }, - "connectorSubtype": "file", - "connectorType": "destination", - "definitionId": "d1c2d3e4-f5a6-7890-bcde-f12345678902", - "dockerImageTag": "0.1.0", - "dockerRepository": "airbyte/destination-hugging-face-datasets", - "documentationUrl": "https://docs.airbyte.com/integrations/destinations/hugging-face-datasets", - "githubIssueLabel": "destination-hugging-face-datasets", - "icon": "icon.svg", - "license": "ELv2", - "name": "Hugging Face Datasets (Destination)", - "registryOverrides": { - "cloud": {"enabled": true}, - "oss": {"enabled": true} - }, - "releaseStage": "alpha", - "supportLevel": "community", - "tags": ["language:python", "cdk:python"], - "connectorTestSuitesOptions": [ - {"suite": "unitTests"}, - { - "suite": "integrationTests", - "testSecrets": [ - { - "name": "SECRET_DESTINATION-HF-DATASETS__CREDS", - "fileName": "config.json", - "secretStore": {"type": "GSM", "alias": "airbyte-connector-testing-secret-store"} - } - ] - } - ] -} \ No newline at end of file +documentationUrl: https://docs.airbyte.com/integrations/destinations/hugging-face-datasets +connectorBuildOptions: + baseImage: docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473649ecb593f56f58e5d0096a87ee0b231777e2e3e73 +connectorSubtype: file +connectorType: destination +definitionId: d1c2d3e4-f5a6-7890-bcde-f12345678902 +dockerImageTag: "0.1.0" +dockerRepository: airbyte/destination-hugging-face-datasets +githubIssueLabel: destination-hugging-face-datasets +icon: icon.svg +license: ELv2 +name: Hugging Face Datasets (Destination) +registryOverrides: + cloud: + enabled: true + oss: + enabled: true +releaseStage: alpha +supportLevel: community +tags: + - language:python + - "cdK:python" +connectorTestSuitesOptions: + - suite: unitTests \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-buckets/metadata.yaml b/airbyte-integrations/connectors/source-hugging-face-buckets/metadata.yaml index 7e3778d15552..10c0db77a8c6 100644 --- a/airbyte-integrations/connectors/source-hugging-face-buckets/metadata.yaml +++ b/airbyte-integrations/connectors/source-hugging-face-buckets/metadata.yaml @@ -1,48 +1,24 @@ -{ - "documentationUrl": "https://docs.airbyte.com/integrations/sources/hugging-face-buckets", - "connectorBuildOptions": { - "baseImage": "docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473649ecb593f56f58e5d0096a87ee0b231777e2e3e73" - }, - "connectorSubtype": "file", - "connectorType": "source", - "definitionId": "b1c2d3e4-f5a6-7890-bcde-f12345678901", - "dockerImageTag": "0.1.0", - "dockerRepository": "airbyte/source-hugging-face-buckets", - "documentationUrl": "https://docs.airbyte.com/integrations/sources/hugging-face-buckets", - "githubIssueLabel": "source-hugging-face-buckets", - "icon": "icon.svg", - "license": "ELv2", - "name": "Hugging Face Buckets (Source)", - "registryOverrides": { - "cloud": { - "enabled": true - }, - "oss": { - "enabled": true - } - }, - "releaseStage": "alpha", - "supportLevel": "community", - "tags": [ - "language:python", - "cdk:python" - ], - "connectorTestSuitesOptions": [ - { - "suite": "unitTests" - }, - { - "suite": "integrationTests", - "testSecrets": [ - { - "name": "SECRET_SOURCE-HF-BUCKETS__CREDS", - "fileName": "config.json", - "secretStore": { - "type": "GSM", - "alias": "airbyte-connector-testing-secret-store" - } - } - ] - } - ] -} \ No newline at end of file +documentationUrl: https://docs.airbyte.com/integrations/sources/hugging-face-buckets +connectorBuildOptions: + baseImage: docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473649ecb593f56f58e5d0096a87ee0b231777e2e3e73 +connectorSubtype: file +connectorType: source +definitionId: b1c2d3e4-f5a6-7890-bcde-f12345678901 +dockerImageTag: "0.1.0" +dockerRepository: airbyte/source-hugging-face-buckets +githubIssueLabel: source-hugging-face-buckets +icon: icon.svg +license: ELv2 +name: Hugging Face Buckets (Source) +registryOverrides: + cloud: + enabled: true + oss: + enabled: true +releaseStage: alpha +supportLevel: community +tags: + - language:python + - "cdK:python" +connectorTestSuitesOptions: + - suite: unitTests \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/source.py b/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/source.py index a1876f515ed4..f17fcebe37e9 100644 --- a/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/source.py +++ b/airbyte-integrations/connectors/source-hugging-face-buckets/source_hugging_face_buckets/source.py @@ -94,7 +94,7 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification: }, documentationUrl="https://docs.airbyte.com/integrations/sources/hugging-face-buckets", supports_incremental=False, - supported_destination_sync_modes=["overwrite"] + supported_destination_sync_modes=["append", "overwrite"] ) def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/README.md b/airbyte-integrations/connectors/source-hugging-face-datasets/README.md deleted file mode 120000 index 1bccb48110c8..000000000000 --- a/airbyte-integrations/connectors/source-hugging-face-datasets/README.md +++ /dev/null @@ -1 +0,0 @@ -../_shared/README-declarative-sources.md \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/README.md b/airbyte-integrations/connectors/source-hugging-face-datasets/README.md new file mode 100644 index 000000000000..bf39897857a9 --- /dev/null +++ b/airbyte-integrations/connectors/source-hugging-face-datasets/README.md @@ -0,0 +1,38 @@ +# Source Hugging Face Datasets + +This is an Airbyte source connector for [Hugging Face Datasets](https://huggingface.co/datasets). + +## Overview + +This connector reads data from Hugging Face Datasets (dataset repositories on Hugging Face Hub). + +## Configuration + +| Field | Required | Description | +|-------|----------|-------------| +| `dataset_name` | Yes | The name of the Hugging Face Dataset to read from. Format: `{username}/{dataset_name}` | +| `token` | No | Hugging Face API token for authentication | +| `streaming` | No | Whether to enable streaming to save disk space | + +### Examples + +```json +{ + "dataset_name": "lhoestq/demo1", + "token": "hf_..." +} +``` + +## How it works + +This connector uses the `datasets` library to read the dataset. During the sync, it reads the files and emits records based on the configured file format. + +## Supported Formats + +- Parquet (.parquet) +- CSV (.csv) +- JSON/JSONL (.json, .jsonl) +- Lance (.lance) +- WebDataset (.tar) +- Image, audio, video +- and more, see the full list in the [documentation](https://huggingface.co/docs/datasets/package_reference/loading_methods#from-files) diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/acceptance-test-config.yml b/airbyte-integrations/connectors/source-hugging-face-datasets/acceptance-test-config.yml deleted file mode 100644 index 694d6e3df436..000000000000 --- a/airbyte-integrations/connectors/source-hugging-face-datasets/acceptance-test-config.yml +++ /dev/null @@ -1,17 +0,0 @@ -# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) -# for more information about how to configure these tests -connector_image: airbyte/source-hugging-face-datasets:dev -acceptance_tests: - spec: - tests: - - spec_path: "manifest.yaml" - connection: - bypass_reason: "This is a builder contribution, and we do not have secrets at this time" - discovery: - bypass_reason: "This is a builder contribution, and we do not have secrets at this time" - basic_read: - bypass_reason: "This is a builder contribution, and we do not have secrets at this time" - incremental: - bypass_reason: "This is a builder contribution, and we do not have secrets at this time" - full_refresh: - bypass_reason: "This is a builder contribution, and we do not have secrets at this time" diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/manifest.yaml b/airbyte-integrations/connectors/source-hugging-face-datasets/manifest.yaml deleted file mode 100644 index e03c0c9afa16..000000000000 --- a/airbyte-integrations/connectors/source-hugging-face-datasets/manifest.yaml +++ /dev/null @@ -1,223 +0,0 @@ -version: 6.5.2 - -type: DeclarativeSource - -description: >- - Allows importing any datasets from Hugging Face - ([https://huggingface.co/datasets](https://huggingface.co/datasets)) - -check: - type: CheckStream - stream_names: - - rows - -definitions: - streams: - rows: - type: DeclarativeStream - name: rows - retriever: - type: SimpleRetriever - requester: - $ref: "#/definitions/base_requester" - path: >- - /rows?dataset={{ config["dataset_name"] }}&config={{ - stream_partition.key.split("::")[1] }}&split={{ - stream_partition.key.split("::")[2] }} - http_method: GET - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: - - rows - paginator: - type: DefaultPaginator - page_token_option: - type: RequestOption - inject_into: request_parameter - field_name: offset - page_size_option: - type: RequestOption - field_name: length - inject_into: request_parameter - pagination_strategy: - type: OffsetIncrement - page_size: 100 - inject_on_first_request: true - partition_router: - type: SubstreamPartitionRouter - parent_stream_configs: - - type: ParentStreamConfig - parent_key: key - partition_field: key - stream: - $ref: "#/definitions/streams/splits" - transformations: - - type: AddFields - fields: - - path: - - config - value: "{{ stream_partition.key }}" - schema_loader: - type: InlineSchemaLoader - schema: - $ref: "#/schemas/rows" - splits: - type: DeclarativeStream - name: splits - retriever: - type: SimpleRetriever - requester: - $ref: "#/definitions/base_requester" - path: /splits?dataset={{ config["dataset_name"] }} - http_method: GET - record_selector: - type: RecordSelector - extractor: - type: DpathExtractor - field_path: - - splits - record_filter: - type: RecordFilter - condition: >- - {{ (not config["dataset_splits"] or record.split in - config["dataset_splits"]) and (not config["dataset_subsets"] or - record.config in config["dataset_subsets"]) }} - transformations: - - type: AddFields - fields: - - path: - - key - value: >- - {{ "::".join([record["dataset"], record["config"], - record["split"]]) }} - schema_loader: - type: InlineSchemaLoader - schema: - $ref: "#/schemas/splits" - base_requester: - type: HttpRequester - url_base: https://datasets-server.huggingface.co - -streams: - - $ref: "#/definitions/streams/rows" - - $ref: "#/definitions/streams/splits" - -spec: - type: Spec - connection_specification: - type: object - $schema: http://json-schema.org/draft-07/schema# - required: - - dataset_name - properties: - dataset_name: - type: string - order: 0 - title: Dataset Name - dataset_subsets: - type: array - description: >- - Dataset Subsets to import. Will import all of them if nothing is - provided (see - https://huggingface.co/docs/dataset-viewer/en/configs_and_splits for - more details) - order: 1 - title: Dataset Subsets - dataset_splits: - type: array - description: >- - Splits to import. Will import all of them if nothing is provided (see - https://huggingface.co/docs/dataset-viewer/en/configs_and_splits for - more details) - order: 2 - title: Dataset Splits - additionalProperties: true - -metadata: - autoImportSchema: - rows: true - splits: true - testedStreams: - rows: - streamHash: 7dbbacc410a5b2e190f31a4699420ede8232f1ac - hasResponse: true - responsesAreSuccessful: true - hasRecords: true - primaryKeysArePresent: true - primaryKeysAreUnique: true - splits: - streamHash: 6f7caee478d97fb11794d8e47e1f4a276ebd934a - hasResponse: true - responsesAreSuccessful: true - hasRecords: true - primaryKeysArePresent: true - primaryKeysAreUnique: true - assist: {} - -schemas: - rows: - type: object - $schema: http://json-schema.org/schema# - additionalProperties: true - properties: - config: - type: - - string - - "null" - row: - type: - - object - - "null" - properties: - messages: - type: - - array - - "null" - items: - type: - - object - - "null" - properties: - content: - type: - - string - - "null" - role: - type: - - string - - "null" - tokens: - type: - - number - - "null" - row_idx: - type: - - number - - "null" - truncated_cells: - type: - - array - - "null" - splits: - type: object - $schema: http://json-schema.org/schema# - additionalProperties: true - properties: - config: - type: - - string - - "null" - dataset: - type: - - string - - "null" - key: - type: - - string - - "null" - split: - type: - - string - - "null" diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/metadata.yaml b/airbyte-integrations/connectors/source-hugging-face-datasets/metadata.yaml index bfb2445bf221..75a1844d2794 100644 --- a/airbyte-integrations/connectors/source-hugging-face-datasets/metadata.yaml +++ b/airbyte-integrations/connectors/source-hugging-face-datasets/metadata.yaml @@ -1,39 +1,24 @@ -metadataSpecVersion: "1.0" -data: - allowedHosts: - hosts: - - "huggingface.co" - - "huggingface.com" - registryOverrides: - oss: - enabled: true - cloud: - enabled: true - remoteRegistries: - pypi: - enabled: true - packageName: airbyte-source-hugging-face-datasets - connectorBuildOptions: - baseImage: docker.io/airbyte/source-python:7.23.4 - connectorSubtype: python - connectorType: source - definitionId: 38438040-03d9-406d-b10b-af83beadd3ef - dockerImageTag: 0.1.0 - dockerRepository: airbyte/source-hugging-face-datasets - githubIssueLabel: source-hugging-face-datasets - icon: icon.svg - license: ELv2 - name: Hugging Face - Datasets - releaseDate: 2024-11-28 - releaseStage: alpha - supportLevel: community - documentationUrl: https://docs.airbyte.com/integrations/sources/hugging-face-datasets - tags: - - cdk:python - ab_internal: - ql: 100 - sl: 100 - externalDocumentationUrls: - - title: Hugging Face Datasets documentation - url: https://huggingface.co/docs/datasets/ - type: api_reference \ No newline at end of file +documentationUrl: https://docs.airbyte.com/integrations/sources/hugging-face-datasets +connectorBuildOptions: + baseImage: docker.io/airbyte/python-connector-base:4.0.2@sha256:9fdb1888c4264cf6fee473649ecb593f56f58e5d0096a87ee0b231777e2e3e73 +connectorSubtype: file +connectorType: source +definitionId: b1c2d3e4-f5a6-7890-bcde-f12345678901 +dockerImageTag: "0.1.0" +dockerRepository: airbyte/source-hugging-face-datasets +githubIssueLabel: source-hugging-face-datasets +icon: icon.svg +license: ELv2 +name: Hugging Face datasets (Source) +registryOverrides: + cloud: + enabled: true + oss: + enabled: true +releaseStage: alpha +supportLevel: community +tags: + - language:python + - "cdK:python" +connectorTestSuitesOptions: + - suite: unitTests \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/source.py b/airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/source.py index 71309fe97c4c..5c9f7cc2acd8 100644 --- a/airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/source.py +++ b/airbyte-integrations/connectors/source-hugging-face-datasets/source_hugging_face_datasets/source.py @@ -37,34 +37,6 @@ def spec( self, logger: logging.Logger ) -> ConnectorSpecification: """Returns the connector specification.""" - spec_path = __file__.replace("source.py", "spec.json") - if spec_path: - import os - if os.path.exists(spec_path): - with open(spec_path) as f: - spec_dict = json.load(f) - # Extract the first item from the list (Airbyte spec format) - conn_spec = spec_dict["connectionSpecification"] - if isinstance(conn_spec, list): - conn_spec = conn_spec[0] if conn_spec else {} - - # Add the streaming field if not present - if conn_spec.get("type") == "object": - properties = conn_spec.get("properties", {}) - if "streaming" not in properties: - properties["streaming"] = { - "order": 4, - "title": "Streaming Mode", - "type": "boolean", - "default": False, - "description": "When true, datasets are streamed on-the-fly without caching to disk. Use this for very large datasets where you don't want to fill disk space. Note: streaming mode is slower and less reliable than non-streaming mode.", - } - conn_spec["properties"] = properties - - return ConnectorSpecification( - connectionSpecification=conn_spec, - documentationUrl=spec_dict.get("documentationUrl", ""), - ) return ConnectorSpecification( connectionSpecification={ "title": "Dataset Configuration", @@ -107,6 +79,8 @@ def spec( "type": "object", }, documentationUrl="https://docs.airbyte.com/integrations/sources/hugging-face-datasets", + supports_incremental=False, + supported_destination_sync_modes=["append", "overwrite"] ) def check( diff --git a/airbyte-integrations/connectors/source-hugging-face-datasets/spec.json b/airbyte-integrations/connectors/source-hugging-face-datasets/spec.json index 9978658c508a..c9170bec0660 100644 --- a/airbyte-integrations/connectors/source-hugging-face-datasets/spec.json +++ b/airbyte-integrations/connectors/source-hugging-face-datasets/spec.json @@ -1,5 +1,7 @@ { - "documentationUrl": "https://docs.airbyte.com/integrations/sources/hugging-face-datasets", + "documentationUrl": "https://docs.airbyte.com/integrations/sources/hugging-face-buckets", + "supported_destination_sync_modes": ["append", "overwrite"], + "supportsIncremental": false, "connectionSpecification": [ { "title": "Dataset Configuration", @@ -45,9 +47,5 @@ "required": ["dataset_name"], "type": "object" } - ], - "supportsNormalization": false, - "supportsDBTReplication": false, - "supportsCDC": false, - "supportsIDBasedIncrementalSync": false + ] } \ No newline at end of file