Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions airbyte_cdk/sources/file_based/config/excel_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,8 @@ class Config(OneOfOptionConfig):
"excel",
const=True,
)
sheet_name: str = Field(
default="0",
title="Sheet Name",
description='The Excel worksheet to read. Use a sheet name, a zero-indexed position like "0", or "*" to read all sheets.',
)
78 changes: 54 additions & 24 deletions airbyte_cdk/sources/file_based/file_types/excel_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

class ExcelParser(FileTypeParser):
ENCODING = None
ALL_SHEETS = "*"

def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
"""
Expand Down Expand Up @@ -62,18 +63,20 @@ async def infer_schema(

# Validate the format of the config
self.validate_format(config.format, logger)
excel_format = config.format
if not isinstance(excel_format, ExcelFormat):
raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR)

fields: Dict[str, str] = {}

with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
df = self.open_and_parse_file(fp, logger, file)
for column, df_type in df.dtypes.items():
# Choose the broadest data type if the column's data type differs in dataframes
prev_frame_column_type = fields.get(column) # type: ignore [call-overload]
fields[column] = self.dtype_to_json_type( # type: ignore [index]
prev_frame_column_type,
df_type,
)
for df in self._parse_excel_file(fp, excel_format, logger, file).values():
for column, df_type in df.dtypes.items():
prev_frame_column_type = fields.get(column) # type: ignore [call-overload]
fields[column] = self.dtype_to_json_type( # type: ignore [index]
prev_frame_column_type,
df_type,
)

schema = {
field: (
Expand Down Expand Up @@ -109,18 +112,19 @@ def parse_records(

# Validate the format of the config
self.validate_format(config.format, logger)
excel_format = config.format
if not isinstance(excel_format, ExcelFormat):
raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR)

try:
# Open and parse the file using the stream reader
with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
df = self.open_and_parse_file(fp, logger, file)
# Yield records as dictionaries
# DataFrame.to_dict() method returns datetime values in pandas.Timestamp values, which are not serializable by orjson
# DataFrame.to_json() returns string with datetime values serialized to iso8601 with microseconds to align with pydantic behavior
# see PR description: https://github.com/airbytehq/airbyte/pull/44444/
yield from orjson.loads(
df.to_json(orient="records", date_format="iso", date_unit="us")
)
for df in self._parse_excel_file(fp, excel_format, logger, file).values():
# DataFrame.to_dict() returns pandas.Timestamp values not serializable by orjson.
# DataFrame.to_json() serializes datetimes to iso8601 with microseconds.
yield from orjson.loads(
df.to_json(orient="records", date_format="iso", date_unit="us")
)

except Exception as exc:
# Raise a RecordParseError if any exception occurs during parsing
Expand Down Expand Up @@ -187,7 +191,8 @@ def _open_and_parse_file_with_calamine(
fp: Union[IOBase, str, Path],
logger: logging.Logger,
file: RemoteFile,
) -> pd.DataFrame:
sheet_name: Union[int, str, None] = 0,
) -> Union[pd.DataFrame, Dict[Union[int, str], pd.DataFrame]]:
"""Opens and parses Excel file using Calamine engine.

Args:
Expand All @@ -202,7 +207,7 @@ def _open_and_parse_file_with_calamine(
ExcelCalamineParsingError: If Calamine fails to parse the file.
"""
try:
return pd.ExcelFile(fp, engine="calamine").parse() # type: ignore [arg-type, call-overload, no-any-return]
return pd.ExcelFile(fp, engine="calamine").parse(sheet_name=sheet_name) # type: ignore [arg-type, call-overload, no-any-return]
except BaseException as exc:
# Calamine engine raises PanicException(child of BaseException) if Calamine fails to parse the file.
# Checking if ValueError in exception arg to know if it was actually an error during parsing due to invalid values in cells.
Expand All @@ -222,7 +227,8 @@ def _open_and_parse_file_with_openpyxl(
fp: Union[IOBase, str, Path],
logger: logging.Logger,
file: RemoteFile,
) -> pd.DataFrame:
sheet_name: Union[int, str, None] = 0,
) -> Union[pd.DataFrame, Dict[Union[int, str], pd.DataFrame]]:
"""Opens and parses Excel file using Openpyxl engine.

Args:
Expand All @@ -245,19 +251,20 @@ def _open_and_parse_file_with_openpyxl(

with warnings.catch_warnings(record=True) as warning_records:
warnings.simplefilter("always")
df = pd.ExcelFile(fp, engine="openpyxl").parse() # type: ignore [arg-type, call-overload]
dfs = pd.ExcelFile(fp, engine="openpyxl").parse(sheet_name=sheet_name) # type: ignore [arg-type, call-overload]

for warning in warning_records:
logger.warning(f"Openpyxl warning for {file.file_uri_for_logging}: {warning.message}")

return df # type: ignore [no-any-return]
return dfs # type: ignore [no-any-return]

def open_and_parse_file(
self,
fp: Union[IOBase, str, Path],
logger: logging.Logger,
file: RemoteFile,
) -> pd.DataFrame:
sheet_name: Union[int, str, None] = 0,
) -> Union[pd.DataFrame, Dict[Union[int, str], pd.DataFrame]]:
"""Opens and parses the Excel file with Calamine-first and Openpyxl fallback.

Args:
Expand All @@ -269,6 +276,29 @@ def open_and_parse_file(
pd.DataFrame: Parsed data from the Excel file.
"""
try:
return self._open_and_parse_file_with_calamine(fp, logger, file)
return self._open_and_parse_file_with_calamine(fp, logger, file, sheet_name)
except ExcelCalamineParsingError:
return self._open_and_parse_file_with_openpyxl(fp, logger, file)
return self._open_and_parse_file_with_openpyxl(fp, logger, file, sheet_name)

def _parse_excel_file(
self,
fp: Union[IOBase, str, Path],
excel_format: ExcelFormat,
logger: logging.Logger,
file: RemoteFile,
) -> Dict[Union[int, str], pd.DataFrame]:
"""Parses an Excel file and returns a dict of sheet name → DataFrame."""
sheet_name = self._resolve_sheet_name(excel_format)
parsed = self.open_and_parse_file(fp, logger, file, sheet_name)
if isinstance(parsed, pd.DataFrame):
return {excel_format.sheet_name: parsed}
return parsed

def _resolve_sheet_name(self, excel_format: ExcelFormat) -> Union[int, str, None]:
"""Converts the string config value to a pandas-compatible `sheet_name` argument."""
value = excel_format.sheet_name
if value == self.ALL_SHEETS:
return None
if value.isdecimal():
return int(value)
return value
105 changes: 103 additions & 2 deletions unit_tests/sources/file_based/file_types/test_excel_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#


import asyncio
import datetime
import warnings
from io import BytesIO
Expand Down Expand Up @@ -152,7 +153,7 @@ def test_open_and_parse_file_falls_back_to_openpyxl(mock_logger):

calamine_excel_file = MagicMock()

def calamine_parse_side_effect():
def calamine_parse_side_effect(**kwargs):
raise FakePanic(
"failed to construct date: PyErr { type: <class 'ValueError'>, value: ValueError('year 20225 is out of range'), traceback: None }"
)
Expand All @@ -161,7 +162,7 @@ def calamine_parse_side_effect():

openpyxl_excel_file = MagicMock()

def openpyxl_parse_side_effect():
def openpyxl_parse_side_effect(**kwargs):
warnings.warn("Cell A146 has invalid date", UserWarning)
return fallback_df

Expand Down Expand Up @@ -238,3 +239,103 @@ def seek(self, *args, **kwargs):
assert "Could not rewind stream" in msg
assert remote_file.file_uri_for_logging in msg
mock_excel.assert_called_once_with(fp, engine="openpyxl")
openpyxl_excel_file.parse.assert_called_once_with(sheet_name=0)


def _make_multisheet_excel_bytes() -> bytes:
"""Creates an in-memory Excel workbook with two sheets for testing."""
buf = BytesIO()
with pd.ExcelWriter(buf, engine="xlsxwriter") as writer:
pd.DataFrame({"col_a": ["first"], "shared": [1]}).to_excel(
writer, index=False, sheet_name="First"
)
pd.DataFrame({"col_b": [2.5], "shared": [2]}).to_excel(
writer, index=False, sheet_name="Second"
)
return buf.getvalue()


def _stream_reader_for(excel_bytes: bytes) -> MagicMock:
reader = MagicMock(spec=AbstractFileBasedStreamReader)
reader.open_file.return_value = BytesIO(excel_bytes)
return reader


@pytest.mark.parametrize(
"sheet_name,expected_records",
[
pytest.param(
"0",
[{"col_a": "first", "shared": 1}],
id="default_first_sheet",
),
pytest.param(
"Second",
[{"col_b": 2.5, "shared": 2}],
id="sheet_by_name",
),
pytest.param(
"*",
[{"col_a": "first", "shared": 1}, {"col_b": 2.5, "shared": 2}],
id="all_sheets",
),
],
)
def test_parse_records_selects_configured_sheet(sheet_name, expected_records, remote_file):
parser = ExcelParser()
config = FileBasedStreamConfig(name="test_stream", format=ExcelFormat(sheet_name=sheet_name))
reader = _stream_reader_for(_make_multisheet_excel_bytes())

records = list(parser.parse_records(config, remote_file, reader, MagicMock()))

assert records == expected_records


@pytest.mark.parametrize(
"sheet_name,expected_schema",
[
pytest.param(
"0",
{"col_a": {"type": "string"}, "shared": {"type": "number"}},
id="first_sheet_schema",
),
pytest.param(
"*",
{
"col_a": {"type": "string"},
"col_b": {"type": "number"},
"shared": {"type": "number"},
},
id="all_sheets_merged_schema",
),
],
)
def test_infer_schema_with_sheet_selection(sheet_name, expected_schema, remote_file):
parser = ExcelParser()
config = FileBasedStreamConfig(name="test_stream", format=ExcelFormat(sheet_name=sheet_name))
reader = _stream_reader_for(_make_multisheet_excel_bytes())

loop = asyncio.new_event_loop()
try:
schema = loop.run_until_complete(
parser.infer_schema(config, remote_file, reader, MagicMock())
)
finally:
loop.close()

assert schema == expected_schema


@pytest.mark.parametrize(
"config_value,expected",
[
pytest.param("0", 0, id="zero_index"),
pytest.param("1", 1, id="numeric_index"),
pytest.param("MySheet", "MySheet", id="named_sheet"),
pytest.param("*", None, id="all_sheets"),
],
)
def test_resolve_sheet_name(config_value, expected):
parser = ExcelParser()
fmt = ExcelFormat(sheet_name=config_value)
assert parser._resolve_sheet_name(fmt) == expected
8 changes: 7 additions & 1 deletion unit_tests/sources/file_based/scenarios/csv_scenarios.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,13 @@
"default": "excel",
"const": "excel",
"type": "string",
}
},
"sheet_name": {
"title": "Sheet Name",
"description": 'The Excel worksheet to read. Use a sheet name, a zero-indexed position like "0", or "*" to read all sheets.',
"default": "0",
"type": "string",
},
},
"required": ["filetype"],
},
Expand Down
Loading