diff --git a/source/src/main/java/tech/deepq/source/handler/SourceHandler.java b/source/src/main/java/tech/deepq/source/handler/SourceHandler.java index 5c1e353..5421a62 100644 --- a/source/src/main/java/tech/deepq/source/handler/SourceHandler.java +++ b/source/src/main/java/tech/deepq/source/handler/SourceHandler.java @@ -122,23 +122,23 @@ public class SourceHandler { if (Objects.nonNull(task.getDispatcher())) { - Map nextNodeMessage = buildNextNodeMessage(result, task, message); + Map> nextNodeMessage = buildNextNodeMessage(result, task, message); - nextNodeMessage.forEach((k, v) -> { + nextNodeMessage.forEach((topic, messageList) -> messageList.forEach(m -> { if (!ObjectUtils.isEmpty(otherCacheKeySet)) { - Set set = Objects.requireNonNullElse(v.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class), new HashSet()); + Set set = Objects.requireNonNullElse(m.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class), new HashSet()); set.addAll(otherCacheKeySet); - v = MessageBuilder - .fromMessage(v) + m = MessageBuilder + .fromMessage(m) .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, set) .build(); } - boolean send = streamBridge.send(k, v); + boolean send = streamBridge.send(topic, m); if (!send) { throw new RuntimeException("send message failed"); } - }); + })); return null; } @@ -162,10 +162,14 @@ public class SourceHandler { } - private Map buildNextNodeMessage(List result, Task task, Message message) { + /** + * 根据当前任务返回值与当前任务的分支配置信息构建将要发送的消息 + * @return {@code Map} + */ + private Map> buildNextNodeMessage(List result, Task task, Message message) { Dispatcher filter = task.getDispatcher(); - Map resultMap = new HashMap<>(); + Map> resultMap = new HashMap<>(); result.stream() .map(item -> { HttpRes httpRes = HttpRes.build() @@ -238,7 +242,10 @@ public class SourceHandler { .setHeader(MessageConstants.SEED_URL, httpRes.getRequestParams().getUrl()) .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet) .build(); - resultMap.put(topic, nextTaskMessage); + if (!resultMap.containsKey(topic)) { + resultMap.put(topic, new LinkedList<>()); + } + resultMap.get(topic).add(nextTaskMessage); if ("break".equals(filter.getMode())) { break; }