diff --git a/flink-python/pyflink/common/typeinfo.py b/flink-python/pyflink/common/typeinfo.py index 861231ed167dc..9b5c8959c2607 100644 --- a/flink-python/pyflink/common/typeinfo.py +++ b/flink-python/pyflink/common/typeinfo.py @@ -755,9 +755,11 @@ def __repr__(self): class ExternalTypeInfo(TypeInformation): - def __init__(self, type_info: TypeInformation): + def __init__(self, type_info: TypeInformation, _j_external_type_info=None): super(ExternalTypeInfo, self).__init__() self._type_info = type_info + if _j_external_type_info is not None: + self._j_typeinfo = _j_external_type_info def get_java_type_info(self) -> JavaObject: if not self._j_typeinfo: @@ -771,6 +773,14 @@ def get_java_type_info(self) -> JavaObject: self._j_typeinfo = JExternalTypeInfo.of(j_data_type) return self._j_typeinfo + def __getstate__(self): + state = self.__dict__.copy() + state.pop('_j_typeinfo', None) + return state + + def __setstate__(self, state): + self.__dict__.update(state) + def __eq__(self, other): return self.__class__ == other.__class__ and self._type_info == other._type_info @@ -1112,8 +1122,10 @@ def _from_java_type(j_type_info: JavaObject) -> TypeInformation: if _is_instance_of(j_type_info, JExternalTypeInfo): TypeInfoDataTypeConverter = \ gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter - return ExternalTypeInfo(_from_java_type( - TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType()))) + return ExternalTypeInfo( + _from_java_type( + TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType())), + _j_external_type_info=j_type_info) raise TypeError("The java type info: %s is not supported in PyFlink currently." % j_type_info) diff --git a/flink-python/pyflink/fn_execution/embedded/converters.py b/flink-python/pyflink/fn_execution/embedded/converters.py index 8829d48074ee7..26df5c3e35e28 100644 --- a/flink-python/pyflink/fn_execution/embedded/converters.py +++ b/flink-python/pyflink/fn_execution/embedded/converters.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ +import datetime as _dt import pickle from abc import ABC, abstractmethod from typing import TypeVar, List, Tuple @@ -22,22 +23,37 @@ from pemja import findClass from pyflink.common import Row, RowKind, TypeInformation -from pyflink.common.typeinfo import (PickledBytesTypeInfo, PrimitiveArrayTypeInfo, - BasicArrayTypeInfo, ObjectArrayTypeInfo, RowTypeInfo, - TupleTypeInfo, MapTypeInfo, ListTypeInfo) +from pyflink.common.typeinfo import ( + PickledBytesTypeInfo, + PrimitiveArrayTypeInfo, + BasicArrayTypeInfo, + ObjectArrayTypeInfo, + RowTypeInfo, + TupleTypeInfo, + MapTypeInfo, + ListTypeInfo, +) from pyflink.datastream import TimeWindow, CountWindow, GlobalWindow -IN = TypeVar('IN') -OUT = TypeVar('OUT') +IN = TypeVar("IN") +OUT = TypeVar("OUT") # Java Window -JTimeWindow = findClass('org.apache.flink.table.runtime.operators.window.TimeWindow') -JCountWindow = findClass('org.apache.flink.table.runtime.operators.window.CountWindow') -JGlobalWindow = findClass('org.apache.flink.streaming.api.windowing.windows.GlobalWindow') +JTimeWindow = findClass("org.apache.flink.table.runtime.operators.window.TimeWindow") +JCountWindow = findClass("org.apache.flink.table.runtime.operators.window.CountWindow") +JGlobalWindow = findClass( + "org.apache.flink.streaming.api.windowing.windows.GlobalWindow" +) +# Java time types used by the temporal converters below. Looked up once at +# module load time so the converters don't pay the JNI cost per call. +JLocalDateTime = findClass("java.time.LocalDateTime") +JLocalDate = findClass("java.time.LocalDate") +JLocalTime = findClass("java.time.LocalTime") +JInstant = findClass("java.time.Instant") -class DataConverter(ABC): +class DataConverter(ABC): @abstractmethod def to_internal(self, value) -> IN: pass @@ -80,20 +96,29 @@ def to_internal(self, value) -> IN: if value is None: return None - return tuple([self._field_data_converters[i].to_internal(item) - for i, item in enumerate(value)]) + return tuple( + [ + self._field_data_converters[i].to_internal(item) + for i, item in enumerate(value) + ] + ) def to_external(self, value) -> OUT: if value is None: return None - return tuple([self._field_data_converters[i].to_external(item) - for i, item in enumerate(value)]) + return tuple( + [ + self._field_data_converters[i].to_external(item) + for i, item in enumerate(value) + ] + ) class RowDataConverter(DataConverter): - - def __init__(self, field_data_converters: List[DataConverter], field_names: List[str]): + def __init__( + self, field_data_converters: List[DataConverter], field_names: List[str] + ): self._field_data_converters = field_data_converters self._field_names = field_names @@ -102,8 +127,10 @@ def to_internal(self, value) -> IN: return None row = Row() - row._values = [self._field_data_converters[i].to_internal(item) - for i, item in enumerate(value[1])] + row._values = [ + self._field_data_converters[i].to_internal(item) + for i, item in enumerate(value[1]) + ] row.set_field_names(self._field_names) row.set_row_kind(RowKind(value[0])) @@ -114,13 +141,16 @@ def to_external(self, value: Row) -> OUT: return None values = value._values - fields = tuple([self._field_data_converters[i].to_external(values[i]) - for i in range(len(values))]) + fields = tuple( + [ + self._field_data_converters[i].to_external(values[i]) + for i in range(len(values)) + ] + ) return value.get_row_kind().value, fields class TupleDataConverter(DataConverter): - def __init__(self, field_data_converters: List[DataConverter]): self._field_data_converters = field_data_converters @@ -128,19 +158,26 @@ def to_internal(self, value) -> IN: if value is None: return None - return tuple([self._field_data_converters[i].to_internal(item) - for i, item in enumerate(value)]) + return tuple( + [ + self._field_data_converters[i].to_internal(item) + for i, item in enumerate(value) + ] + ) def to_external(self, value: Tuple) -> OUT: if value is None: return None - return tuple([self._field_data_converters[i].to_external(item) - for i, item in enumerate(value)]) + return tuple( + [ + self._field_data_converters[i].to_external(item) + for i, item in enumerate(value) + ] + ) class ListDataConverter(DataConverter): - def __init__(self, field_converter: DataConverter): self._field_converter = field_converter @@ -162,12 +199,291 @@ def __init__(self, field_converter: DataConverter): super(ArrayDataConverter, self).__init__(field_converter) def to_internal(self, value) -> IN: - return tuple(super(ArrayDataConverter, self).to_internal(value)) + if value is None: + return None + + return list(super(ArrayDataConverter, self).to_internal(value)) def to_external(self, value) -> OUT: + if value is None: + return None + return tuple(super(ArrayDataConverter, self).to_external(value)) +class LocalDateTimeConverter(DataConverter): + """Converter for TIMESTAMP(precision) columns in embedded (thread) mode. + + On the input side (Java -> Python UDF argument) the value can arrive as + either (a) a native Python ``datetime.datetime`` — pemja's automatic + Java->Python conversion handles top-level ``java.sql.Timestamp`` / + ``LocalDateTime`` method arguments — or (b) a ``pemja.PyJObject`` + wrapping a ``java.time.LocalDateTime``, which is what happens for values + nested inside arrays, rows, or map values (pemja does not recursively + auto-convert Object[] elements). + + On the output side (Python return -> Java) the converter explicitly + constructs a ``java.time.LocalDateTime`` so the downstream Java converter + chain — which uses ``DataFormatConverters.LocalDateTimeConverter`` as the + default for TIMESTAMP fields in nested rows — receives the type it + expects instead of a pemja-defaulted ``java.sql.Timestamp``. + + Precision: Python ``datetime`` is microsecond-precise, so values with + TIMESTAMP(p) where p <= 6 round-trip without loss. For TIMESTAMP(9) the + sub-microsecond nanoseconds are silently truncated. + """ + + def to_internal(self, value) -> IN: + if value is None: + return None + # Fast path: pemja already produced a Python datetime (happens for + # top-level method arguments when pemja auto-converts the bare Java + # value crossing the JNI boundary). + # + # We check via ``type(value) is datetime`` rather than ``isinstance`` + # because pemja's C extension causes CPython's ``isinstance`` to raise + # "SystemError: returned NULL without + # setting an error" when invoked on a ``pemja.PyJObject``. + # ``type(value)`` uses Py_TYPE at the C level and is unaffected. + if type(value) is _dt.datetime: + return value + # Nested case: value is a pemja.PyJObject wrapping a Java + # java.time.LocalDateTime. Extract the fields via Java getters. + return _dt.datetime( + value.getYear(), + value.getMonthValue(), + value.getDayOfMonth(), + value.getHour(), + value.getMinute(), + value.getSecond(), + value.getNano() // 1000, + ) + + def to_external(self, value) -> OUT: + if value is None: + return None + return JLocalDateTime.of( + value.year, + value.month, + value.day, + value.hour, + value.minute, + value.second, + value.microsecond * 1000, + ) + + +class LocalDateConverter(DataConverter): + """Converter for DATE columns in embedded (thread) mode. + + Mirrors LocalDateTimeConverter's pattern: the input value may be a native + ``datetime.date`` (top-level method arg auto-converted by pemja) or a + ``pemja.PyJObject`` wrapping a ``java.time.LocalDate`` (nested positions). + The output is always an explicit Java ``LocalDate`` to match the external + type the downstream Java converter chain expects. + """ + + def to_internal(self, value) -> IN: + if value is None: + return None + if type(value) is _dt.date: + return value + return _dt.date( + value.getYear(), + value.getMonthValue(), + value.getDayOfMonth(), + ) + + def to_external(self, value) -> OUT: + if value is None: + return None + return JLocalDate.of(value.year, value.month, value.day) + + +class LocalTimeConverter(DataConverter): + """Converter for TIME(precision) columns in embedded (thread) mode. + + Mirrors LocalDateTimeConverter's pattern against ``java.time.LocalTime``. + Python ``datetime.time`` is microsecond-precise so TIME(9) loses + sub-microsecond nanoseconds silently. + """ + + def to_internal(self, value) -> IN: + if value is None: + return None + if type(value) is _dt.time: + return value + return _dt.time( + value.getHour(), + value.getMinute(), + value.getSecond(), + value.getNano() // 1000, + ) + + def to_external(self, value) -> OUT: + if value is None: + return None + return JLocalTime.of( + value.hour, + value.minute, + value.second, + value.microsecond * 1000, + ) + + +class InstantConverter(DataConverter): + """Converter for TIMESTAMP_LTZ / TIMESTAMP WITH LOCAL TIME ZONE columns. + + Java's ``java.time.Instant`` is absolute UTC. Python's ``datetime`` may be + naive or tz-aware. Process mode returns naive ``datetime`` (interpreted as + UTC); we match that shape to keep thread mode compatible. + + On the input side we extract epoch-second and nano from the Java Instant + and construct a naive UTC ``datetime``. + + On the output side we accept both naive (treated as UTC, matching process + mode) and tz-aware datetimes and convert to a Java ``Instant`` via + ``Instant.ofEpochSecond(seconds, nanos)``. + """ + + def to_internal(self, value) -> IN: + if value is None: + return None + if type(value) is _dt.datetime: + return value + # pemja.PyJObject wrapping java.time.Instant + epoch_s = value.getEpochSecond() + nano = value.getNano() + # Construct a naive UTC datetime. Python datetime is microsecond + # precise, so sub-microsecond nanoseconds are truncated. + return _dt.datetime(1970, 1, 1) + _dt.timedelta( + seconds=epoch_s, microseconds=nano // 1000 + ) + + def to_external(self, value) -> OUT: + if value is None: + return None + # Normalise to an epoch-second + nanos. For naive datetimes we assume + # UTC, matching process-mode behaviour. + if value.tzinfo is None: + delta = value - _dt.datetime(1970, 1, 1) + else: + delta = value - _dt.datetime(1970, 1, 1, tzinfo=_dt.timezone.utc) + epoch_s = int(delta.total_seconds()) + nano = ( + delta.microseconds + (delta.total_seconds() - epoch_s) * 1_000_000 + ) * 1000 + return JInstant.ofEpochSecond(epoch_s, int(nano)) + + +class DataStreamLocalDateTimeConverter(DataConverter): + """Converter for TIMESTAMP columns on the DataStream path. + + Identical to ``LocalDateTimeConverter`` on the input side (Java → Python), + but ``to_external`` returns the Python ``datetime`` as-is, letting pemja's + auto-conversion produce ``java.sql.Timestamp``. The DataStream→Table + ``InputConversionOperator`` expects ``Timestamp``, whereas the Table API's + internal ``DataFormatConverters.RowConverter`` expects ``LocalDateTime`` — + hence the two separate converter classes. + """ + + def to_internal(self, value) -> IN: + if value is None: + return None + if type(value) is _dt.datetime: + return value + return _dt.datetime( + value.getYear(), + value.getMonthValue(), + value.getDayOfMonth(), + value.getHour(), + value.getMinute(), + value.getSecond(), + value.getNano() // 1000, + ) + + def to_external(self, value) -> OUT: + if value is None: + return None + return value + + +class DataStreamLocalDateConverter(DataConverter): + """DataStream variant of LocalDateConverter — to_external returns as-is.""" + + def to_internal(self, value) -> IN: + if value is None: + return None + if type(value) is _dt.date: + return value + return _dt.date( + value.getYear(), + value.getMonthValue(), + value.getDayOfMonth(), + ) + + def to_external(self, value) -> OUT: + if value is None: + return None + return value + + +class DataStreamLocalTimeConverter(DataConverter): + """DataStream/Table variant of LocalTimeConverter. + + Unlike the other temporal converters, ``to_external`` must explicitly build a + ``java.time.LocalTime``: pemja auto-converts Python ``datetime.time`` to + ``java.sql.Time`` which only has seconds precision, silently dropping the + sub-second microseconds needed for TIME(3)+ columns. Wrapping via ``JLocalTime.of`` + preserves nanos all the way to the Java-side converter's ``toInternalImpl``. + """ + + def to_internal(self, value) -> IN: + if value is None: + return None + elif type(value) is _dt.time: + return value + else: + return _dt.time( + value.getHour(), + value.getMinute(), + value.getSecond(), + value.getNano() // 1000, + ) + + def to_external(self, value) -> OUT: + return ( + JLocalTime.of( + value.hour, + value.minute, + value.second, + value.microsecond * 1000, + ) + if value is not None + else None + ) + + +class DataStreamInstantConverter(DataConverter): + """DataStream variant of InstantConverter — to_external returns as-is.""" + + def to_internal(self, value) -> IN: + if value is None: + return None + if type(value) is _dt.datetime: + return value + epoch_s = value.getEpochSecond() + nano = value.getNano() + return _dt.datetime(1970, 1, 1) + _dt.timedelta( + seconds=epoch_s, microseconds=nano // 1000 + ) + + def to_external(self, value) -> OUT: + if value is None: + return None + return value + + class DictDataConverter(DataConverter): def __init__(self, key_converter: DataConverter, value_converter: DataConverter): self._key_converter = key_converter @@ -177,15 +493,19 @@ def to_internal(self, value) -> IN: if value is None: return None - return {self._key_converter.to_internal(k): self._value_converter.to_internal(v) - for k, v in value.items()} + return { + self._key_converter.to_internal(k): self._value_converter.to_internal(v) + for k, v in value.items() + } def to_external(self, value) -> OUT: if value is None: return None - return {self._key_converter.to_external(k): self._value_converter.to_external(v) - for k, v in value.items()} + return { + self._key_converter.to_external(k): self._value_converter.to_external(v) + for k, v in value.items() + } class TimeWindowConverter(DataConverter): @@ -197,7 +517,6 @@ def to_external(self, value: TimeWindow) -> OUT: class CountWindowConverter(DataConverter): - def to_internal(self, value) -> CountWindow: return CountWindow(value.getId()) @@ -206,7 +525,6 @@ def to_external(self, value: CountWindow) -> OUT: class GlobalWindowConverter(DataConverter): - def to_internal(self, value) -> IN: return GlobalWindow() @@ -225,20 +543,40 @@ def from_type_info_proto(type_info): return PickleDataConverter() elif type_name == type_info_name.ROW: return RowDataConverter( - [from_type_info_proto(f.field_type) for f in type_info.row_type_info.fields], - [f.field_name for f in type_info.row_type_info.fields]) + [ + from_type_info_proto(f.field_type) + for f in type_info.row_type_info.fields + ], + [f.field_name for f in type_info.row_type_info.fields], + ) elif type_name == type_info_name.TUPLE: return TupleDataConverter( - [from_type_info_proto(field_type) - for field_type in type_info.tuple_type_info.field_types]) - elif type_name in (type_info_name.BASIC_ARRAY, - type_info_name.OBJECT_ARRAY): - return ArrayDataConverter(from_type_info_proto(type_info.collection_element_type)) - elif type_info == type_info_name.LIST: - return ListDataConverter(from_type_info_proto(type_info.collection_element_type)) + [ + from_type_info_proto(field_type) + for field_type in type_info.tuple_type_info.field_types + ] + ) + elif type_name in (type_info_name.BASIC_ARRAY, type_info_name.OBJECT_ARRAY): + return ArrayDataConverter( + from_type_info_proto(type_info.collection_element_type) + ) + elif type_name == type_info_name.LIST: + return ListDataConverter( + from_type_info_proto(type_info.collection_element_type) + ) elif type_name == type_info_name.MAP: - return DictDataConverter(from_type_info_proto(type_info.map_type_info.key_type), - from_type_info_proto(type_info.map_type_info.value_type)) + return DictDataConverter( + from_type_info_proto(type_info.map_type_info.key_type), + from_type_info_proto(type_info.map_type_info.value_type), + ) + elif type_name in (type_info_name.LOCAL_DATETIME, type_info_name.SQL_TIMESTAMP): + return DataStreamLocalDateTimeConverter() + elif type_name in (type_info_name.LOCAL_DATE, type_info_name.SQL_DATE): + return DataStreamLocalDateConverter() + elif type_name in (type_info_name.LOCAL_TIME, type_info_name.SQL_TIME): + return DataStreamLocalTimeConverter() + elif type_name in (type_info_name.INSTANT, type_info_name.LOCAL_ZONED_TIMESTAMP): + return DataStreamInstantConverter() return IdentityDataConverter() @@ -260,12 +598,25 @@ def from_field_type_proto(field_type): if type_name == schema_type_name.ROW: return RowDataConverter( [from_field_type_proto(f.type) for f in field_type.row_schema.fields], - [f.name for f in field_type.row_schema.fields]) + [f.name for f in field_type.row_schema.fields], + ) elif type_name == schema_type_name.BASIC_ARRAY: - return ArrayDataConverter(from_field_type_proto(field_type.collection_element_type)) + return ArrayDataConverter( + from_field_type_proto(field_type.collection_element_type) + ) elif type_name == schema_type_name.MAP: - return DictDataConverter(from_field_type_proto(field_type.map_info.key_type), - from_field_type_proto(field_type.map_info.value_type)) + return DictDataConverter( + from_field_type_proto(field_type.map_info.key_type), + from_field_type_proto(field_type.map_info.value_type), + ) + elif type_name == schema_type_name.TIMESTAMP: + return DataStreamLocalDateTimeConverter() + elif type_name == schema_type_name.LOCAL_ZONED_TIMESTAMP: + return DataStreamInstantConverter() + elif type_name == schema_type_name.DATE: + return DataStreamLocalDateConverter() + elif type_name == schema_type_name.TIME: + return DataStreamLocalTimeConverter() return IdentityDataConverter() @@ -273,12 +624,16 @@ def from_field_type_proto(field_type): def from_type_info(type_info: TypeInformation): if isinstance(type_info, (PickledBytesTypeInfo, RowTypeInfo, TupleTypeInfo)): return PickleDataConverter() - elif isinstance(type_info, (PrimitiveArrayTypeInfo, BasicArrayTypeInfo, ObjectArrayTypeInfo)): + elif isinstance( + type_info, (PrimitiveArrayTypeInfo, BasicArrayTypeInfo, ObjectArrayTypeInfo) + ): return ArrayDataConverter(from_type_info(type_info._element_type)) elif isinstance(type_info, ListTypeInfo): return ListDataConverter(from_type_info(type_info.elem_type)) elif isinstance(type_info, MapTypeInfo): - return DictDataConverter(from_type_info(type_info._key_type_info), - from_type_info(type_info._value_type_info)) + return DictDataConverter( + from_type_info(type_info._key_type_info), + from_type_info(type_info._value_type_info), + ) return IdentityDataConverter() diff --git a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java index 7ee63382c1c63..7e6240089b73f 100644 --- a/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java +++ b/flink-python/src/main/java/org/apache/flink/streaming/api/utils/PythonTypeUtils.java @@ -867,27 +867,25 @@ public T toExternal(T value) { * Python Long will be converted to Long in PemJa, so we need ByteDataConverter to convert Java * Long to internal Byte. */ - public static final class ByteDataConverter extends DataConverter { + public static final class ByteDataConverter extends DataConverter { private static final long serialVersionUID = 1L; public static final ByteDataConverter INSTANCE = new ByteDataConverter(); @Override - public Byte toInternal(Long value) { + public Byte toInternal(Number value) { if (value == null) { return null; } - return value.byteValue(); } @Override - public Long toExternal(Byte value) { + public Number toExternal(Byte value) { if (value == null) { return null; } - return value.longValue(); } } @@ -896,89 +894,208 @@ public Long toExternal(Byte value) { * Python Long will be converted to Long in PemJa, so we need ShortDataConverter to convert Java * Long to internal Short. */ - public static final class ShortDataConverter extends DataConverter { + public static final class ShortDataConverter extends DataConverter { private static final long serialVersionUID = 1L; public static final ShortDataConverter INSTANCE = new ShortDataConverter(); @Override - public Short toInternal(Long value) { + public Short toInternal(Number value) { if (value == null) { return null; } - return value.shortValue(); } @Override - public Long toExternal(Short value) { + public Number toExternal(Short value) { if (value == null) { return null; } - return value.longValue(); } } /** - * Python Long will be converted to Long in PemJa, so we need IntDataConverter to convert Java - * Long to internal Integer. + * Accepts both Long (from pemja int→Long) and Double (from pemja float→Double) + * via the Number supertype, avoiding bridge-method ClassCastException. */ - public static final class IntDataConverter extends DataConverter { + public static final class IntDataConverter extends DataConverter { private static final long serialVersionUID = 1L; public static final IntDataConverter INSTANCE = new IntDataConverter(); @Override - public Integer toInternal(Long value) { + public Integer toInternal(Number value) { if (value == null) { return null; } - return value.intValue(); } @Override - public Long toExternal(Integer value) { + public Number toExternal(Integer value) { if (value == null) { return null; } - return value.longValue(); } } /** - * Python Float will be converted to Double in PemJa, so we need FloatDataConverter to convert - * Java Double to internal Float. + * Accepts both Double (from pemja float→Double) and Long (from pemja int→Long) + * via the Number supertype, avoiding bridge-method ClassCastException. */ - public static final class FloatDataConverter extends DataConverter { + public static final class FloatDataConverter extends DataConverter { private static final long serialVersionUID = 1L; public static final FloatDataConverter INSTANCE = new FloatDataConverter(); @Override - public Float toInternal(Double value) { + public Float toInternal(Number value) { if (value == null) { return null; } - return value.floatValue(); } @Override - public Double toExternal(Float value) { + public Number toExternal(Float value) { if (value == null) { return null; } - return value.doubleValue(); } } + /** + * Converts between pemja's auto-converted {@link java.sql.Timestamp} and {@link + * LocalDateTime}. + * + *

Pemja maps Python {@code datetime} to {@link java.sql.Timestamp} on the Python→Java + * boundary. In thread mode the downstream {@link + * org.apache.flink.table.runtime.typeutils.ExternalSerializer} expects {@link LocalDateTime} + * (the default bridge class for TIMESTAMP columns). This converter bridges the mismatch. + */ + public static final class LocalDateTimeDataConverter + extends DataConverter { + + private static final long serialVersionUID = 1L; + + public static final LocalDateTimeDataConverter INSTANCE = new LocalDateTimeDataConverter(); + + @Override + public LocalDateTime toInternal(Object value) { + if (value == null) { + return null; + } + if (value instanceof LocalDateTime) { + return (LocalDateTime) value; + } + if (value instanceof Timestamp) { + return ((Timestamp) value).toLocalDateTime(); + } + throw new IllegalArgumentException( + "Expected LocalDateTime or Timestamp but got: " + + value.getClass().getName()); + } + + @Override + public Object toExternal(LocalDateTime value) { + return value; + } + } + + /** Converts between pemja's auto-converted {@link Date} and {@link LocalDate}. */ + public static final class LocalDateDataConverter extends DataConverter { + + private static final long serialVersionUID = 1L; + + public static final LocalDateDataConverter INSTANCE = new LocalDateDataConverter(); + + @Override + public LocalDate toInternal(Object value) { + if (value == null) { + return null; + } + if (value instanceof LocalDate) { + return (LocalDate) value; + } + if (value instanceof Date) { + return ((Date) value).toLocalDate(); + } + throw new IllegalArgumentException( + "Expected LocalDate or Date but got: " + value.getClass().getName()); + } + + @Override + public Object toExternal(LocalDate value) { + return value; + } + } + + /** Converts between pemja's auto-converted {@link Time} and {@link LocalTime}. */ + public static final class LocalTimeDataConverter extends DataConverter { + + private static final long serialVersionUID = 1L; + + public static final LocalTimeDataConverter INSTANCE = new LocalTimeDataConverter(); + + @Override + public LocalTime toInternal(Object value) { + if (value == null) { + return null; + } + if (value instanceof LocalTime) { + return (LocalTime) value; + } + if (value instanceof Time) { + return ((Time) value).toLocalTime(); + } + throw new IllegalArgumentException( + "Expected LocalTime or Time but got: " + value.getClass().getName()); + } + + @Override + public Object toExternal(LocalTime value) { + return value; + } + } + + /** + * Converts between pemja's auto-converted {@link Timestamp} and {@link Instant} for + * TIMESTAMP_WITH_LOCAL_TIME_ZONE columns. + */ + public static final class InstantDataConverter extends DataConverter { + + private static final long serialVersionUID = 1L; + + public static final InstantDataConverter INSTANCE = new InstantDataConverter(); + + @Override + public Instant toInternal(Object value) { + if (value == null) { + return null; + } + if (value instanceof Instant) { + return (Instant) value; + } + if (value instanceof Timestamp) { + return ((Timestamp) value).toInstant(); + } + throw new IllegalArgumentException( + "Expected Instant or Timestamp but got: " + value.getClass().getName()); + } + + @Override + public Object toExternal(Instant value) { + return value; + } + } + /** * Row Data will be converted to the Object Array [RowKind(as Long Object), Field Values(as * Object Array)]. @@ -988,16 +1105,9 @@ public static final class RowDataConverter extends DataConverter private static final long serialVersionUID = 1L; private final DataConverter[] fieldDataConverters; - private final Row reuseRow; - private final Object[] reuseExternalRow; - private final Object[] reuseExternalRowData; RowDataConverter(DataConverter[] dataConverters) { this.fieldDataConverters = dataConverters; - this.reuseRow = new Row(fieldDataConverters.length); - this.reuseExternalRowData = new Object[fieldDataConverters.length]; - this.reuseExternalRow = new Object[2]; - this.reuseExternalRow[1] = reuseExternalRowData; } @SuppressWarnings("unchecked") @@ -1008,12 +1118,13 @@ public Row toInternal(Object[] value) { } RowKind rowKind = RowKind.fromByteValue(((Long) value[0]).byteValue()); - reuseRow.setKind(rowKind); Object[] fieldValues = (Object[]) value[1]; + Row row = new Row(fieldDataConverters.length); + row.setKind(rowKind); for (int i = 0; i < fieldValues.length; i++) { - reuseRow.setField(i, fieldDataConverters[i].toInternal(fieldValues[i])); + row.setField(i, fieldDataConverters[i].toInternal(fieldValues[i])); } - return reuseRow; + return row; } @SuppressWarnings("unchecked") @@ -1023,11 +1134,12 @@ public Object[] toExternal(Row value) { return null; } - reuseExternalRow[0] = (long) value.getKind().toByteValue(); + // Fresh allocation per call to avoid aliasing when used inside ArrayDataConverter. + Object[] fields = new Object[fieldDataConverters.length]; for (int i = 0; i < value.getArity(); i++) { - reuseExternalRowData[i] = fieldDataConverters[i].toExternal(value.getField(i)); + fields[i] = fieldDataConverters[i].toExternal(value.getField(i)); } - return reuseExternalRow; + return new Object[] {(long) value.getKind().toByteValue(), fields}; } @Override @@ -1101,13 +1213,9 @@ public boolean equals(Object o) { public static final class TupleDataConverter extends DataConverter { private final DataConverter[] fieldDataConverters; - private final Tuple reuseTuple; - private final Object[] reuseExternalTuple; TupleDataConverter(DataConverter[] dataConverters) { this.fieldDataConverters = dataConverters; - this.reuseTuple = Tuple.newInstance(dataConverters.length); - this.reuseExternalTuple = new Object[dataConverters.length]; } @SuppressWarnings("unchecked") @@ -1117,10 +1225,12 @@ public Tuple toInternal(Object[] value) { return null; } + // Fresh allocation per call to avoid aliasing when used inside ArrayDataConverter. + Tuple tuple = Tuple.newInstance(fieldDataConverters.length); for (int i = 0; i < value.length; i++) { - reuseTuple.setField(fieldDataConverters[i].toInternal(value[i]), i); + tuple.setField(fieldDataConverters[i].toInternal(value[i]), i); } - return reuseTuple; + return tuple; } @SuppressWarnings("unchecked") @@ -1130,10 +1240,11 @@ public Object[] toExternal(Tuple value) { return null; } + Object[] result = new Object[fieldDataConverters.length]; for (int i = 0; i < value.getArity(); i++) { - reuseExternalTuple[i] = fieldDataConverters[i].toExternal(value.getField(i)); + result[i] = fieldDataConverters[i].toExternal(value.getField(i)); } - return reuseExternalTuple; + return result; } @Override @@ -1356,6 +1467,18 @@ public static class TypeInfoToDataConverter { BasicTypeInfo.FLOAT_TYPE_INFO.getTypeClass(), FloatDataConverter.INSTANCE); typeInfoToDataConverterMap.put( BasicTypeInfo.BYTE_TYPE_INFO.getTypeClass(), ByteDataConverter.INSTANCE); + typeInfoToDataConverterMap.put( + LocalTimeTypeInfo.LOCAL_DATE_TIME.getTypeClass(), + LocalDateTimeDataConverter.INSTANCE); + typeInfoToDataConverterMap.put( + LocalTimeTypeInfo.LOCAL_DATE.getTypeClass(), + LocalDateDataConverter.INSTANCE); + typeInfoToDataConverterMap.put( + LocalTimeTypeInfo.LOCAL_TIME.getTypeClass(), + LocalTimeDataConverter.INSTANCE); + typeInfoToDataConverterMap.put( + BasicTypeInfo.INSTANT_TYPE_INFO.getTypeClass(), + InstantDataConverter.INSTANCE); } @SuppressWarnings("unchecked") diff --git a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java index 20278c60567f3..8d13b69ab8ce4 100644 --- a/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java +++ b/flink-python/src/main/java/org/apache/flink/table/runtime/typeutils/PythonTypeUtils.java @@ -33,6 +33,7 @@ import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.MapData; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.data.util.DataFormatConverters; import org.apache.flink.table.runtime.typeutils.serializers.python.ArrayDataSerializer; import org.apache.flink.table.runtime.typeutils.serializers.python.DecimalDataSerializer; @@ -493,6 +494,153 @@ public final OUT toExternal(RowData row, int column) { abstract OUT toExternalImpl(INTER value); } + /** + * Converts between pemja's auto-converted {@link java.sql.Timestamp} and {@link + * java.time.LocalDateTime}. + * + *

Pemja maps Python {@code datetime} to {@link java.sql.Timestamp}. The outer {@link + * DataFormatConverters.RowConverter} expects {@link java.time.LocalDateTime} (the default + * bridge class for TIMESTAMP columns). This converter bridges the mismatch. + */ + public static final class TimestampLocalDateTimeDataConverter + extends DataConverter { + + private final int precision; + + TimestampLocalDateTimeDataConverter(int precision) { + super(new DataFormatConverters.LocalDateTimeConverter(precision)); + this.precision = precision; + } + + @Override + java.time.LocalDateTime toInternalImpl(Object value) { + if (value == null) { + return null; + } + if (value instanceof java.time.LocalDateTime) { + return (java.time.LocalDateTime) value; + } + if (value instanceof java.sql.Timestamp) { + return ((java.sql.Timestamp) value).toLocalDateTime(); + } + throw new IllegalArgumentException( + "Expected LocalDateTime or Timestamp but got: " + + value.getClass().getName()); + } + + @Override + Object toExternalImpl(java.time.LocalDateTime value) { + if (value == null) { + return null; + } + return java.sql.Timestamp.valueOf(value); + } + } + + /** + * Converts between pemja's auto-converted {@link java.sql.Timestamp} and {@link + * java.time.Instant} for TIMESTAMP_WITH_LOCAL_TIME_ZONE columns. + */ + public static final class TimestampInstantDataConverter + extends DataConverter { + + private final int precision; + + TimestampInstantDataConverter(int precision) { + super(new DataFormatConverters.InstantConverter(precision)); + this.precision = precision; + } + + @Override + java.time.Instant toInternalImpl(Object value) { + if (value == null) { + return null; + } + if (value instanceof java.time.Instant) { + return (java.time.Instant) value; + } + if (value instanceof java.sql.Timestamp) { + return ((java.sql.Timestamp) value).toInstant(); + } + throw new IllegalArgumentException( + "Expected Instant or Timestamp but got: " + + value.getClass().getName()); + } + + @Override + Object toExternalImpl(java.time.Instant value) { + if (value == null) { + return null; + } + return java.sql.Timestamp.from(value); + } + } + + /** Converts between pemja's auto-converted {@link java.sql.Date} and {@link java.time.LocalDate}. */ + public static final class DateLocalDateDataConverter + extends DataConverter { + + static final DateLocalDateDataConverter INSTANCE = new DateLocalDateDataConverter(); + + DateLocalDateDataConverter() { + super(DataFormatConverters.LocalDateConverter.INSTANCE); + } + + @Override + java.time.LocalDate toInternalImpl(Object value) { + if (value == null) { + return null; + } + if (value instanceof java.time.LocalDate) { + return (java.time.LocalDate) value; + } + if (value instanceof java.sql.Date) { + return ((java.sql.Date) value).toLocalDate(); + } + throw new IllegalArgumentException( + "Expected LocalDate or Date but got: " + value.getClass().getName()); + } + + @Override + Object toExternalImpl(java.time.LocalDate value) { + if (value == null) { + return null; + } + return java.sql.Date.valueOf(value); + } + } + + /** Converts between pemja's auto-converted {@link java.sql.Time} and {@link java.time.LocalTime}. */ + public static final class TimeLocalTimeDataConverter + extends DataConverter { + + static final TimeLocalTimeDataConverter INSTANCE = new TimeLocalTimeDataConverter(); + + TimeLocalTimeDataConverter() { + super(DataFormatConverters.LocalTimeConverter.INSTANCE); + } + + @Override + java.time.LocalTime toInternalImpl(Object value) { + if (value == null) { + return null; + } + if (value instanceof java.time.LocalTime) { + return (java.time.LocalTime) value; + } + if (value instanceof java.sql.Time) { + return ((java.sql.Time) value).toLocalTime(); + } + throw new IllegalArgumentException( + "Expected LocalTime or Time but got: " + value.getClass().getName()); + } + + @Override + Object toExternalImpl(java.time.LocalTime value) { + return value; + } + } + /** Identity data converter. */ public static final class IdentityDataConverter extends DataConverter { IdentityDataConverter( @@ -515,7 +663,7 @@ OUT toExternalImpl(OUT value) { * Python Long will be converted to Long in PemJa, so we need ByteDataConverter to convert Java * Long to internal Byte. */ - public static final class ByteDataConverter extends DataConverter { + public static final class ByteDataConverter extends DataConverter { public static final ByteDataConverter INSTANCE = new ByteDataConverter(); @@ -524,12 +672,18 @@ private ByteDataConverter() { } @Override - Byte toInternalImpl(Long value) { + Byte toInternalImpl(Number value) { + if (value == null) { + return null; + } return value.byteValue(); } @Override - Long toExternalImpl(Byte value) { + Number toExternalImpl(Byte value) { + if (value == null) { + return null; + } return value.longValue(); } } @@ -538,7 +692,7 @@ Long toExternalImpl(Byte value) { * Python Long will be converted to Long in PemJa, so we need ShortDataConverter to convert Java * Long to internal Short. */ - public static final class ShortDataConverter extends DataConverter { + public static final class ShortDataConverter extends DataConverter { public static final ShortDataConverter INSTANCE = new ShortDataConverter(); @@ -547,12 +701,18 @@ private ShortDataConverter() { } @Override - Short toInternalImpl(Long value) { + Short toInternalImpl(Number value) { + if (value == null) { + return null; + } return value.shortValue(); } @Override - Long toExternalImpl(Short value) { + Number toExternalImpl(Short value) { + if (value == null) { + return null; + } return value.longValue(); } } @@ -561,7 +721,7 @@ Long toExternalImpl(Short value) { * Python Long will be converted to Long in PemJa, so we need IntDataConverter to convert Java * Long to internal Integer. */ - public static final class IntDataConverter extends DataConverter { + public static final class IntDataConverter extends DataConverter { public static final IntDataConverter INSTANCE = new IntDataConverter(); @@ -570,12 +730,18 @@ private IntDataConverter() { } @Override - Integer toInternalImpl(Long value) { + Integer toInternalImpl(Number value) { + if (value == null) { + return null; + } return value.intValue(); } @Override - Long toExternalImpl(Integer value) { + Number toExternalImpl(Integer value) { + if (value == null) { + return null; + } return value.longValue(); } } @@ -584,7 +750,7 @@ Long toExternalImpl(Integer value) { * Python Float will be converted to Double in PemJa, so we need FloatDataConverter to convert * Java Double to internal Float. */ - public static final class FloatDataConverter extends DataConverter { + public static final class FloatDataConverter extends DataConverter { public static final FloatDataConverter INSTANCE = new FloatDataConverter(); @@ -593,12 +759,18 @@ private FloatDataConverter() { } @Override - Float toInternalImpl(Double value) { + Float toInternalImpl(Number value) { + if (value == null) { + return null; + } return value.floatValue(); } @Override - Double toExternalImpl(Float value) { + Number toExternalImpl(Float value) { + if (value == null) { + return null; + } return value.doubleValue(); } } @@ -633,46 +805,35 @@ Time toExternalImpl(Integer value) { public static final class RowDataConverter extends DataConverter { private final DataConverter[] fieldDataConverters; - private final Row reuseRow; - private final Object[] reuseExternalRow; - private final Object[] reuseExternalRowData; RowDataConverter( DataConverter[] fieldDataConverters, DataFormatConverters.DataFormatConverter dataFormatConverter) { super(dataFormatConverter); this.fieldDataConverters = fieldDataConverters; - this.reuseRow = new Row(fieldDataConverters.length); - this.reuseExternalRowData = new Object[fieldDataConverters.length]; - this.reuseExternalRow = new Object[2]; - this.reuseExternalRow[1] = reuseExternalRowData; } @SuppressWarnings("unchecked") @Override Row toInternalImpl(Object[] value) { - RowKind rowKind = RowKind.fromByteValue(((Long) value[0]).byteValue()); - reuseRow.setKind(rowKind); - Object[] fieldValues = (Object[]) value[1]; - + Row row = new Row(fieldDataConverters.length); + row.setKind(rowKind); for (int i = 0; i < fieldValues.length; i++) { - reuseRow.setField(i, fieldDataConverters[i].toInternalImpl(fieldValues[i])); + row.setField(i, fieldDataConverters[i].toInternalImpl(fieldValues[i])); } - return reuseRow; + return row; } @SuppressWarnings("unchecked") @Override Object[] toExternalImpl(Row value) { - - reuseExternalRow[0] = (long) value.getKind().toByteValue(); + Object[] fields = new Object[fieldDataConverters.length]; for (int i = 0; i < value.getArity(); i++) { - reuseExternalRowData[i] = fieldDataConverters[i].toExternalImpl(value.getField(i)); + fields[i] = fieldDataConverters[i].toExternalImpl(value.getField(i)); } - - return reuseExternalRow; + return new Object[] {(long) value.getKind().toByteValue(), fields}; } } @@ -832,25 +993,22 @@ public DataConverter visit(BinaryType binaryType) { @Override public DataConverter visit(DateType dateType) { - return new IdentityDataConverter<>(DataFormatConverters.DateConverter.INSTANCE); + return new DateLocalDateDataConverter(); } @Override public DataConverter visit(TimeType timeType) { - return TimeDataConverter.INSTANCE; + return TimeLocalTimeDataConverter.INSTANCE; } @Override public DataConverter visit(TimestampType timestampType) { - return new IdentityDataConverter<>( - new DataFormatConverters.TimestampConverter(timestampType.getPrecision())); + return new TimestampLocalDateTimeDataConverter(timestampType.getPrecision()); } @Override public DataConverter visit(LocalZonedTimestampType localZonedTimestampType) { - return new IdentityDataConverter<>( - new DataFormatConverters.TimestampConverter( - localZonedTimestampType.getPrecision())); + return new TimestampInstantDataConverter(localZonedTimestampType.getPrecision()); } @SuppressWarnings("unchecked")