Skip to content

[FLINK-35180][python] Fix embedded (thread-mode) type converters#27927

Draft
polsinello wants to merge 1 commit intoapache:release-1.19from
polsinello:FLINK-35180-fix-thread-mode-type-converters
Draft

[FLINK-35180][python] Fix embedded (thread-mode) type converters#27927
polsinello wants to merge 1 commit intoapache:release-1.19from
polsinello:FLINK-35180-fix-thread-mode-type-converters

Conversation

@polsinello
Copy link
Copy Markdown

What is the purpose of the change

In embedded (thread) mode, Pemja's JNI bridge auto-converts Python datetime/date/time objects to java.sql.Timestamp/Date/Time, but Flink's ExternalSerializer and DataFormatConverters.RowConverter expect the modern java.time.* bridge classes (LocalDateTime, LocalDate, LocalTime, Instant). This mismatch causes ClassCastException at serialization boundaries for any UDF that returns or receives temporal types. Process mode is unaffected because its Beam-based runnerOutputTypeSerializer resolves to the correct java.time.* serializers via LegacyTypeInfoDataTypeConverter.

This patch fixes several related type-conversion bugs in the embedded Python execution path so that all Flink-supported types work correctly in thread mode.

Brief change log

  • Add bridge-aware DataConverter implementations for all temporal types (TIMESTAMP, DATE, TIME, TIMESTAMP_LTZ) on both the Table API (PythonTypeUtils in flink-table-runtime) and DataStream (PythonTypeUtils in flink-streaming-java) paths
  • Replace IdentityDataConverter for temporal types where it silently passed java.sql.* objects through without conversion
  • Fix ROW/TUPLE buffer reuse in ArrayDataConverter that caused silent data corruption in ARRAY<ROW<...>> results
  • Widen numeric DataConverter generics from Long/Double to Number to avoid bridge-method ClassCastException when Pemja returns Integer/Float for small values
  • Cache the original Java ExternalTypeInfo in the Python ExternalTypeInfo wrapper to prevent lossy DECIMAL precision round-trips through legacy TypeInfo conversion

Verifying this change

This change is already covered by existing tests — the existing PyFlink Table API and DataStream test suites exercise all affected type paths.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no (fixes type conversion before values reach the serializer)
  • The runtime per-record code paths (performance sensitive): yes — the embedded Python UDF execution path now performs explicit type conversions for temporal/numeric types instead of relying on identity pass-through. The overhead is minimal (one java.time.* factory call per value).
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 14, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

…temporal, numeric, and nested types

Pemja auto-converts Python datetime/date/time to java.sql.Timestamp/Date/Time,
but Flink's ExternalSerializer and DataFormatConverters.RowConverter expect the
modern java.time.* bridge classes (LocalDateTime, LocalDate, LocalTime, Instant).
In thread mode this mismatch causes ClassCastException at serialization boundaries.
Process mode is unaffected because its Beam-based runnerOutputTypeSerializer
resolves to the correct java.time.* serializers via LegacyTypeInfoDataTypeConverter.

This patch:

  - Adds bridge-aware DataConverters for all temporal types on both the
    Table API and DataStream paths, replacing IdentityDataConverter where
    it silently passed java.sql.* through.
  - TimeLocalTimeDataConverter.toExternalImpl returns LocalTime as-is
    instead of java.sql.Time (second-precision only; silently drops
    microseconds for TIME(3)+). DataStreamLocalTimeConverter mirrors this
    on the Python side via explicit JLocalTime.of, preserving sub-second
    precision across the JNI boundary.
  - Fixes row/tuple buffer reuse that caused silent data corruption in
    ARRAY<ROW<>>.
  - Widens numeric DataConverter generics from Long/Double to Number to
    avoid bridge-method ClassCastException under pemja coercion, and adds
    null guards in toInternalImpl / toExternalImpl for Byte/Short/Int/Long.
  - Caches the original Java ExternalTypeInfo to prevent lossy DECIMAL
    precision round-trips through legacy TypeInfo conversion; adds
    __getstate__ / __setstate__ so ExternalTypeInfo survives pickling
    (the cached Java handle is dropped and rebuilt lazily on first access).
  - Null-safety in temporal to_external paths.
@polsinello polsinello force-pushed the FLINK-35180-fix-thread-mode-type-converters branch from c5df8a7 to 6f7be1a Compare April 17, 2026 15:33
Copy link
Copy Markdown
Contributor

@dianfu dianfu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@polsinello Thanks a lot for the PR! Could you resubmit a PR target to release-1.20? The current target version release-1.19 isn't maintained any more and release-1.20 is the last version and also the LTS version which will be still maintained in the 1.x series.

For the PR itself, it's very good. Just left a few minor comments.

/**
* Python Float will be converted to Double in PemJa, so we need FloatDataConverter to convert
* Java Double to internal Float.
* Accepts both Double (from pemja float→Double) and Long (from pemja int→Long)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description seems confusing for me. What's the difference between IntDataConverter and FloatDataConverter?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — the docstrings were copy-paste that didn't reflect the widened Number generic, which made the four narrowing converters look interchangeable. Rewrote so each class now leads with its narrow target type.

@Override
public DataConverter visit(DateType dateType) {
return new IdentityDataConverter<>(DataFormatConverters.DateConverter.INSTANCE);
return new DateLocalDateDataConverter();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use DateLocalDateDataConverter.INSTANCE?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — switched to DateLocalDateDataConverter.INSTANCE at the visit(DateType) call site; the INSTANCE was already defined.

public static final class TimestampInstantDataConverter
extends DataConverter<TimestampData, java.time.Instant, Object> {

private final int precision;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be removed since it's not used

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropped the field + the this.precision = precision assignment; kept the constructor parameter since it still flows to the inner converter.

public static final class TimestampLocalDateTimeDataConverter
extends DataConverter<TimestampData, java.time.LocalDateTime, Object> {

private final int precision;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be removed since it's not used

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above — dropped the redundant local precision field; the inner DataFormatConverters.InstantConverter already stores its own copy.


@Override
public DataConverter visit(TimeType timeType) {
return TimeDataConverter.INSTANCE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TimeDataConverter is not used any more. We can remove it in this PR.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

delta = value - _dt.datetime(1970, 1, 1)
else:
delta = value - _dt.datetime(1970, 1, 1, tzinfo=_dt.timezone.utc)
epoch_s = int(delta.total_seconds())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review comments from AI:

  converters.py line ~375. int() truncates towards zero, which gives incorrect epoch_s/nano pairs for negative timestamps (before 1970):  
                                                                                                                                          
  epoch_s = int(delta.total_seconds())                                                                                                    
  nano = (        
      delta.microseconds + (delta.total_seconds() - epoch_s) * 1_000_000                                                                  
  ) * 1000                                                                                                                                
                                                                                                                                          
  Example: for a datetime representing -99.3s from epoch:                                                                                 
  - delta.total_seconds() = -99.3                                                                                                         
  - epoch_s = int(-99.3) = -99 (truncates towards zero, should be -100)                                                                   
  - nano computes to 400,000,000 instead of 700,000,000                
  - Result: Instant.ofEpochSecond(-99, 400000000) = -98.6s (wrong, should be -99.3s)                                                      
                                                                                                                                          
  Fix: use int(delta.total_seconds() // 1) or math.floor(), or better yet, compute via integer microseconds:                              
                                                                                                                                          
  total_us = int(delta / _dt.timedelta(microseconds=1))                                                                                   
  epoch_s, us_rem = divmod(total_us, 1_000_000)                                                                                           
  return JInstant.ofEpochSecond(epoch_s, us_rem * 1000) 

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, fixed by computing via integer microseconds with divmod, which floor-divides correctly for negative values:

 total_us = delta // _dt.timedelta(microseconds=1)
 epoch_s, us_rem = divmod(total_us, 1_000_000)
 return JInstant.ofEpochSecond(epoch_s, us_rem * 1000)

if value is None:
return None

return list(super(ArrayDataConverter, self).to_internal(value))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this change? (tuple -> list)

Copy link
Copy Markdown
Author

@polsinello polsinello Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The asymmetry is driven by pemja's Python↔Java type-coercion table:

  • to_internal (Java → Python UDF arg) returns list to match process-mode's Beam-based path — UDFs see mutable sequences in both modes, so code like arr.append(x) / arr[i] = v doesn't
    break only in thread mode. Before this change, thread mode returned a tuple, which created a subtle UDF-visible divergence between modes.
  • to_external (Python return → Java) stays tuple because pemja coerces tupleObject[] (what the downstream Java ArrayDataConverter expects), whereas listArrayList, which would
    break ArrayDataConverter.toExternalImpl's typed-array element loop.

I've added a class-level docstring on ArrayDataConverter documenting the asymmetry.

@polsinello
Copy link
Copy Markdown
Author

@dianfu thanks a lot for your review. I've addressed every comment, let me know if this is OK for you. I also opened a PR for release-1.20 with the new fix from your comments. Happy to close this one in favor of the 1.20 PR — whichever you prefer for keeping the history.

@dianfu
Copy link
Copy Markdown
Contributor

dianfu commented Apr 25, 2026

@polsinello Thanks! Let's close this one and discuss on #27999.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants