Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions flink-python/pyflink/common/typeinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,9 +755,11 @@ def __repr__(self):


class ExternalTypeInfo(TypeInformation):
def __init__(self, type_info: TypeInformation):
def __init__(self, type_info: TypeInformation, _j_external_type_info=None):
super(ExternalTypeInfo, self).__init__()
self._type_info = type_info
if _j_external_type_info is not None:
self._j_typeinfo = _j_external_type_info

def get_java_type_info(self) -> JavaObject:
if not self._j_typeinfo:
Expand All @@ -771,6 +773,14 @@ def get_java_type_info(self) -> JavaObject:
self._j_typeinfo = JExternalTypeInfo.of(j_data_type)
return self._j_typeinfo

def __getstate__(self):
state = self.__dict__.copy()
state.pop('_j_typeinfo', None)
return state

def __setstate__(self, state):
self.__dict__.update(state)

def __eq__(self, other):
return self.__class__ == other.__class__ and self._type_info == other._type_info

Expand Down Expand Up @@ -1112,8 +1122,10 @@ def _from_java_type(j_type_info: JavaObject) -> TypeInformation:
if _is_instance_of(j_type_info, JExternalTypeInfo):
TypeInfoDataTypeConverter = \
gateway.jvm.org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
return ExternalTypeInfo(_from_java_type(
TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType())))
return ExternalTypeInfo(
_from_java_type(
TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType())),
_j_external_type_info=j_type_info)

raise TypeError("The java type info: %s is not supported in PyFlink currently." % j_type_info)

Expand Down
Loading