|
|
@@ -122,23 +122,23 @@ public class SourceHandler { |
|
|
|
|
|
|
|
if (Objects.nonNull(task.getDispatcher())) { |
|
|
|
|
|
|
|
Map<String, Message> nextNodeMessage = buildNextNodeMessage(result, task, message); |
|
|
|
Map<String, List<Message>> nextNodeMessage = buildNextNodeMessage(result, task, message); |
|
|
|
|
|
|
|
|
|
|
|
nextNodeMessage.forEach((k, v) -> { |
|
|
|
nextNodeMessage.forEach((topic, messageList) -> messageList.forEach(m -> { |
|
|
|
if (!ObjectUtils.isEmpty(otherCacheKeySet)) { |
|
|
|
Set set = Objects.requireNonNullElse(v.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class), new HashSet()); |
|
|
|
Set set = Objects.requireNonNullElse(m.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class), new HashSet()); |
|
|
|
set.addAll(otherCacheKeySet); |
|
|
|
v = MessageBuilder |
|
|
|
.fromMessage(v) |
|
|
|
m = MessageBuilder |
|
|
|
.fromMessage(m) |
|
|
|
.setHeader(MessageConstants.OTHER_CACHE_KEY_SET, set) |
|
|
|
.build(); |
|
|
|
} |
|
|
|
boolean send = streamBridge.send(k, v); |
|
|
|
boolean send = streamBridge.send(topic, m); |
|
|
|
if (!send) { |
|
|
|
throw new RuntimeException("send message failed"); |
|
|
|
} |
|
|
|
}); |
|
|
|
})); |
|
|
|
return null; |
|
|
|
} |
|
|
|
|
|
|
@@ -162,10 +162,14 @@ public class SourceHandler { |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
private Map<String, Message> buildNextNodeMessage(List<HttpRes> result, Task task, Message<TaskMiniVO> message) { |
|
|
|
/** |
|
|
|
* 根据当前任务返回值与当前任务的分支配置信息构建将要发送的消息 |
|
|
|
* @return {@code Map<topic,list<Message>} |
|
|
|
*/ |
|
|
|
private Map<String, List<Message>> buildNextNodeMessage(List<HttpRes> result, Task task, Message<TaskMiniVO> message) { |
|
|
|
Dispatcher filter = task.getDispatcher(); |
|
|
|
|
|
|
|
Map<String, Message> resultMap = new HashMap<>(); |
|
|
|
Map<String, List<Message>> resultMap = new HashMap<>(); |
|
|
|
result.stream() |
|
|
|
.map(item -> { |
|
|
|
HttpRes httpRes = HttpRes.build() |
|
|
@@ -238,7 +242,10 @@ public class SourceHandler { |
|
|
|
.setHeader(MessageConstants.SEED_URL, httpRes.getRequestParams().getUrl()) |
|
|
|
.setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet) |
|
|
|
.build(); |
|
|
|
resultMap.put(topic, nextTaskMessage); |
|
|
|
if (!resultMap.containsKey(topic)) { |
|
|
|
resultMap.put(topic, new LinkedList<>()); |
|
|
|
} |
|
|
|
resultMap.get(topic).add(nextTaskMessage); |
|
|
|
if ("break".equals(filter.getMode())) { |
|
|
|
break; |
|
|
|
} |
|
|
|