From df13ca3d162c7fc8e6b25ecec73c534d2b3e043d Mon Sep 17 00:00:00 2001 From: yechuan Date: Tue, 15 Aug 2023 13:22:45 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=88=A4=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../work/xuye/source/handler/SourceHandler.java | 28 ++++++++++++---------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/source/src/main/java/work/xuye/source/handler/SourceHandler.java b/source/src/main/java/work/xuye/source/handler/SourceHandler.java index eba2fdb..a3e9bd8 100644 --- a/source/src/main/java/work/xuye/source/handler/SourceHandler.java +++ b/source/src/main/java/work/xuye/source/handler/SourceHandler.java @@ -124,19 +124,21 @@ public class SourceHandler { Map nextNodeMessage = buildNextNodeMessage(result, task, message); - if (!ObjectUtils.isEmpty(nextNodeMessage)) { - nextNodeMessage.forEach((k, v) -> { - Set set = v.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class); - if (!ObjectUtils.isEmpty(set)) { - set.addAll(otherCacheKeySet); - v.getHeaders().put(MessageConstants.OTHER_CACHE_KEY_SET, set); - } - boolean send = streamBridge.send(k, v); - if (!send) { - throw new RuntimeException("send message failed"); - } - }); - } + + nextNodeMessage.forEach((k, v) -> { + if (!ObjectUtils.isEmpty(otherCacheKeySet)) { + Set set = Objects.requireNonNullElse(v.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class), new HashSet()); + set.addAll(otherCacheKeySet); + v = MessageBuilder + .fromMessage(v) + .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, set) + .build(); + } + boolean send = streamBridge.send(k, v); + if (!send) { + throw new RuntimeException("send message failed"); + } + }); return null; }