Skip to content
Draft
43 changes: 43 additions & 0 deletions airbyte/_executors/declarative.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,46 @@ def install(self) -> None:
def uninstall(self) -> None:
"""No-op. The declarative source is included with PyAirbyte."""
pass

def fetch_record(
self,
stream_name: str,
pk_value: str,
config: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Fetch a single record from a stream by primary key.

This method requires airbyte-python-cdk with fetch_record support
(airbytehq/airbyte-python-cdk#846).

Args:
stream_name: Name of the stream to fetch from
pk_value: Primary key value as a string
config: Source configuration (optional, uses instance config if not provided)

Returns:
The fetched record as a dict

Raises:
NotImplementedError: If the installed CDK doesn't support fetch_record
ValueError: If the stream name is not found
RecordNotFoundException: If the record is not found
"""
merged_config = {**self._config_dict}
if config:
merged_config.update(config)

source = self.declarative_source
fetch_record_method = getattr(source, "fetch_record", None)

if fetch_record_method is None:
raise NotImplementedError(
"The installed airbyte-python-cdk does not support fetch_record. "
"This requires airbytehq/airbyte-python-cdk#846 to be merged and installed."
)

return fetch_record_method(
stream_name=stream_name,
pk_value=pk_value,
config=merged_config,
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
118 changes: 118 additions & 0 deletions airbyte/mcp/local_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,124 @@ def read_source_stream_records(
return records


@mcp_tool(
domain="local",
read_only=True,
extra_help_text=_CONFIG_HELP,
)
def fetch_source_stream_record( # noqa: PLR0913
source_connector_name: Annotated[
str,
Field(description="The name of the source connector."),
],
config: Annotated[
dict | str | None,
Field(
description="The configuration for the source connector as a dict or JSON string.",
default=None,
),
],
config_file: Annotated[
str | Path | None,
Field(
description="Path to a YAML or JSON file containing the source connector config.",
default=None,
),
],
config_secret_name: Annotated[
str | None,
Field(
description="The name of the secret containing the configuration.",
default=None,
),
],
*,
stream_name: Annotated[
str,
Field(description="The name of the stream to fetch the record from."),
],
pk_value: Annotated[
str | dict[str, Any],
Field(
description="Either a primary key value as a string (e.g., '123') or a dict "
"with a single entry where the key is the field name and the value is the field value "
"(e.g., {'id': '123'} or {'email': 'user@example.com'}). "
"When allow_scanning=False, dict keys must match the primary key. "
"When allow_scanning=True, dict keys can be any field name.",
),
],
allow_scanning: Annotated[
bool,
Field(
description="When True, enables scanning through records to find a match. "
"This allows searching by non-primary-key fields but is slower and may not find "
"the record if it's not in the first scan_timeout_seconds of data. Default: False.",
default=False,
),
],
scan_timeout_seconds: Annotated[
int,
Field(
description="Maximum time in seconds to scan for a record when allow_scanning=True. "
"Default: 5 seconds.",
default=5,
),
],
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(
description="Optionally override the execution method to use for the connector. "
"This parameter is ignored if manifest_path is provided (yaml mode will be used).",
default="auto",
),
],
manifest_path: Annotated[
str | Path | None,
Field(
description="Path to a local YAML manifest file for declarative connectors.",
default=None,
),
],
) -> dict[str, Any] | str:
"""Fetch a single record from a source connector by primary key or field value.

This tool is only supported for declarative (YAML-based) sources.
"""
try:
source: Source = _get_mcp_source(
connector_name=source_connector_name,
override_execution_mode=override_execution_mode,
manifest_path=manifest_path,
)
config_dict = resolve_config(
config=config,
config_file=config_file,
config_secret_name=config_secret_name,
config_spec_jsonschema=source.config_spec,
)
source.set_config(config_dict)

record = source.get_record(
stream_name=stream_name,
pk_value=pk_value,
allow_scanning=allow_scanning,
scan_timeout_seconds=scan_timeout_seconds,
)

print(
f"Retrieved record with {pk_value} from stream '{stream_name}'",
file=sys.stderr,
)

except Exception as ex:
tb_str = traceback.format_exc()
return (
f"Error fetching record from source '{source_connector_name}': {ex!r}, {ex!s}\n{tb_str}"
)
else:
return record


@mcp_tool(
domain="local",
read_only=True,
Expand Down
Loading