From ac70625b167692b4e2beb8ab46a309092d1e9ef3 Mon Sep 17 00:00:00 2001 From: daguimu Date: Thu, 14 May 2026 23:51:02 +0800 Subject: [PATCH] fix(rocketmq): log warning when FLAG or DELAY header is not numeric RocketMQMessageConverterSupport#getAndWrapMessage swallowed any Integer.parseInt failure on the MQ_FLAG and DELAY_TIME_LEVEL message headers with catch (Exception ignored) {}, silently falling back to flag=0 and delayLevel=0. A producer that set DELAY to a misconfigured non-numeric value (e.g. via a SpEL expression that produced a String instead of an int) would have its delayed delivery silently turn into an immediate send with no log line, no exception, and no way for the caller to know the header was malformed. Add an SLF4J logger to the support class and warn with both header names and their actual values when parsing fails. Behaviour is otherwise unchanged: flag and delayLevel still fall back to 0, the broad catch (Exception) is kept, and the DELAY_TIME_LEVEL header lookup remains inside the try block so any exception path that the original code happened to absorb is still absorbed. Only the variable declaration for delayLevelObj is hoisted out of the try block so the catch clause can include its raw value in the warning. Adds three regression tests that lock in the fallback behaviour: non-numeric DELAY_TIME_LEVEL stays at 0, non-numeric FLAG stays at 0, and conversion does not propagate a parse exception. These tests pass both before and after the fix; their purpose is to prevent a future refactor from regressing the silent-fallback semantics that downstream producers may rely on. --- .../RocketMQMessageConverterSupport.java | 15 ++++++- .../RocketMQMessageConverterSupportTest.java | 39 +++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java index 4e4b6f9eeb..1cbd97644e 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/main/java/com/alibaba/cloud/stream/binder/rocketmq/support/RocketMQMessageConverterSupport.java @@ -26,6 +26,8 @@ import com.alibaba.cloud.stream.binder.rocketmq.custom.RocketMQBeanContainerCache; import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExt; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -40,6 +42,9 @@ */ public final class RocketMQMessageConverterSupport { + private static final Logger log = LoggerFactory + .getLogger(RocketMQMessageConverterSupport.class); + private RocketMQMessageConverterSupport() { } @@ -155,9 +160,10 @@ private static org.apache.rocketmq.common.message.Message getAndWrapMessage( headers.get(toRocketHeaderKey(Headers.FLAG))); int flag = 0; int delayLevel = 0; + Object delayLevelObj = null; try { flagObj = flagObj == null ? 0 : flagObj; - Object delayLevelObj = headers.getOrDefault( + delayLevelObj = headers.getOrDefault( RocketMQConst.PROPERTY_DELAY_TIME_LEVEL, headers.get(toRocketHeaderKey( RocketMQConst.PROPERTY_DELAY_TIME_LEVEL))); @@ -165,7 +171,12 @@ private static org.apache.rocketmq.common.message.Message getAndWrapMessage( delayLevel = Integer.parseInt(String.valueOf(delayLevelObj)); flag = Integer.parseInt(String.valueOf(flagObj)); } - catch (Exception ignored) { + catch (Exception e) { + log.warn( + "Non-numeric '{}' or '{}' header; falling back to flag=0 and delayLevel=0. " + + "flagHeader={}, delayLevelHeader={}", + Headers.FLAG, RocketMQConst.PROPERTY_DELAY_TIME_LEVEL, + flagObj, delayLevelObj, e); } if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); diff --git a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageConverterSupportTest.java b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageConverterSupportTest.java index 147d9652a5..b80b0b3f18 100644 --- a/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageConverterSupportTest.java +++ b/spring-cloud-alibaba-starters/spring-cloud-starter-stream-rocketmq/src/test/java/com/alibaba/cloud/stream/binder/rocketmq/RocketMQMessageConverterSupportTest.java @@ -16,6 +16,7 @@ package com.alibaba.cloud.stream.binder.rocketmq; +import com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst; import com.alibaba.cloud.stream.binder.rocketmq.support.RocketMQMessageConverterSupport; import org.apache.rocketmq.common.message.MessageConst; import org.junit.jupiter.api.Test; @@ -24,6 +25,7 @@ import org.springframework.messaging.support.MessageBuilder; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; /** * @author Sorie @@ -44,4 +46,41 @@ public void convertMessage2MQBlankHeaderTest() { assertThat(testProp).isNull(); assertThat(tagProp).isEqualTo("a"); } + + @Test + public void nonNumericDelayTimeLevelHeaderFallsBackToZero() { + Message message = MessageBuilder.withPayload("msg") + .setHeader(RocketMQConst.PROPERTY_DELAY_TIME_LEVEL, "not-a-number") + .build(); + + org.apache.rocketmq.common.message.Message rkmqMsg = + RocketMQMessageConverterSupport.convertMessage2MQ("topic", message); + + assertThat(rkmqMsg.getDelayTimeLevel()).isEqualTo(0); + assertThat(rkmqMsg.getFlag()).isEqualTo(0); + } + + @Test + public void nonNumericFlagHeaderFallsBackToZero() { + Message message = MessageBuilder.withPayload("msg") + .setHeader(RocketMQConst.Headers.FLAG, "not-a-number") + .build(); + + org.apache.rocketmq.common.message.Message rkmqMsg = + RocketMQMessageConverterSupport.convertMessage2MQ("topic", message); + + assertThat(rkmqMsg.getFlag()).isEqualTo(0); + } + + @Test + public void invalidNumericHeaderDoesNotPropagateException() { + Message message = MessageBuilder.withPayload("msg") + .setHeader(RocketMQConst.PROPERTY_DELAY_TIME_LEVEL, "x") + .setHeader(RocketMQConst.Headers.FLAG, "y") + .build(); + + assertThatCode(() -> + RocketMQMessageConverterSupport.convertMessage2MQ("topic", message)) + .doesNotThrowAnyException(); + } }