|
|
@@ -124,19 +124,21 @@ public class SourceHandler { |
|
|
|
|
|
|
|
Map<String, Message> nextNodeMessage = buildNextNodeMessage(result, task, message); |
|
|
|
|
|
|
|
if (!ObjectUtils.isEmpty(nextNodeMessage)) { |
|
|
|
nextNodeMessage.forEach((k, v) -> { |
|
|
|
Set set = v.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class); |
|
|
|
if (!ObjectUtils.isEmpty(set)) { |
|
|
|
set.addAll(otherCacheKeySet); |
|
|
|
v.getHeaders().put(MessageConstants.OTHER_CACHE_KEY_SET, set); |
|
|
|
} |
|
|
|
boolean send = streamBridge.send(k, v); |
|
|
|
if (!send) { |
|
|
|
throw new RuntimeException("send message failed"); |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
nextNodeMessage.forEach((k, v) -> { |
|
|
|
if (!ObjectUtils.isEmpty(otherCacheKeySet)) { |
|
|
|
Set set = Objects.requireNonNullElse(v.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class), new HashSet()); |
|
|
|
set.addAll(otherCacheKeySet); |
|
|
|
v = MessageBuilder |
|
|
|
.fromMessage(v) |
|
|
|
.setHeader(MessageConstants.OTHER_CACHE_KEY_SET, set) |
|
|
|
.build(); |
|
|
|
} |
|
|
|
boolean send = streamBridge.send(k, v); |
|
|
|
if (!send) { |
|
|
|
throw new RuntimeException("send message failed"); |
|
|
|
} |
|
|
|
}); |
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
|