From 87c5e7b344e5952de9d319298e95f5c1e7d5b4cc Mon Sep 17 00:00:00 2001 From: yechuan Date: Wed, 30 Aug 2023 11:56:02 +0800 Subject: [PATCH] =?UTF-8?q?bug=E4=BF=AE=E5=A4=8D=201=E3=80=81source?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E8=BF=9B=E8=A1=8C=E5=88=86=E6=94=AF=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=86=85=E5=AE=B9=E6=9E=84=E5=BB=BA=E6=97=B6,?= =?UTF-8?q?=E7=9B=B8=E5=90=8Ctopic=E7=9A=84=E6=B6=88=E6=81=AF=E8=A2=AB?= =?UTF-8?q?=E8=A6=86=E7=9B=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tech/deepq/source/handler/SourceHandler.java | 27 ++++++++++++++-------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/source/src/main/java/tech/deepq/source/handler/SourceHandler.java b/source/src/main/java/tech/deepq/source/handler/SourceHandler.java index 5c1e353..5421a62 100644 --- a/source/src/main/java/tech/deepq/source/handler/SourceHandler.java +++ b/source/src/main/java/tech/deepq/source/handler/SourceHandler.java @@ -122,23 +122,23 @@ public class SourceHandler { if (Objects.nonNull(task.getDispatcher())) { - Map nextNodeMessage = buildNextNodeMessage(result, task, message); + Map> nextNodeMessage = buildNextNodeMessage(result, task, message); - nextNodeMessage.forEach((k, v) -> { + nextNodeMessage.forEach((topic, messageList) -> messageList.forEach(m -> { if (!ObjectUtils.isEmpty(otherCacheKeySet)) { - Set set = Objects.requireNonNullElse(v.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class), new HashSet()); + Set set = Objects.requireNonNullElse(m.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class), new HashSet()); set.addAll(otherCacheKeySet); - v = MessageBuilder - .fromMessage(v) + m = MessageBuilder + .fromMessage(m) .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, set) .build(); } - boolean send = streamBridge.send(k, v); + boolean send = streamBridge.send(topic, m); if (!send) { throw new RuntimeException("send message failed"); } - }); + })); return null; } @@ -162,10 +162,14 @@ public class SourceHandler { } - private Map buildNextNodeMessage(List result, Task task, Message message) { + /** + * 根据当前任务返回值与当前任务的分支配置信息构建将要发送的消息 + * @return {@code Map} + */ + private Map> buildNextNodeMessage(List result, Task task, Message message) { Dispatcher filter = task.getDispatcher(); - Map resultMap = new HashMap<>(); + Map> resultMap = new HashMap<>(); result.stream() .map(item -> { HttpRes httpRes = HttpRes.build() @@ -238,7 +242,10 @@ public class SourceHandler { .setHeader(MessageConstants.SEED_URL, httpRes.getRequestParams().getUrl()) .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet) .build(); - resultMap.put(topic, nextTaskMessage); + if (!resultMap.containsKey(topic)) { + resultMap.put(topic, new LinkedList<>()); + } + resultMap.get(topic).add(nextTaskMessage); if ("break".equals(filter.getMode())) { break; }