diff --git a/source/src/main/java/work/xuye/source/handler/SourceHandler.java b/source/src/main/java/work/xuye/source/handler/SourceHandler.java index eba2fdb..a3e9bd8 100644 --- a/source/src/main/java/work/xuye/source/handler/SourceHandler.java +++ b/source/src/main/java/work/xuye/source/handler/SourceHandler.java @@ -124,19 +124,21 @@ public class SourceHandler { Map nextNodeMessage = buildNextNodeMessage(result, task, message); - if (!ObjectUtils.isEmpty(nextNodeMessage)) { - nextNodeMessage.forEach((k, v) -> { - Set set = v.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class); - if (!ObjectUtils.isEmpty(set)) { - set.addAll(otherCacheKeySet); - v.getHeaders().put(MessageConstants.OTHER_CACHE_KEY_SET, set); - } - boolean send = streamBridge.send(k, v); - if (!send) { - throw new RuntimeException("send message failed"); - } - }); - } + + nextNodeMessage.forEach((k, v) -> { + if (!ObjectUtils.isEmpty(otherCacheKeySet)) { + Set set = Objects.requireNonNullElse(v.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class), new HashSet()); + set.addAll(otherCacheKeySet); + v = MessageBuilder + .fromMessage(v) + .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, set) + .build(); + } + boolean send = streamBridge.send(k, v); + if (!send) { + throw new RuntimeException("send message failed"); + } + }); return null; }