From 0fb7bca14bee8abbd0ab19fa27eb26ede471d4d2 Mon Sep 17 00:00:00 2001 From: yechuan Date: Thu, 17 Aug 2023 14:33:24 +0800 Subject: [PATCH] =?UTF-8?q?next=5Ffilter=E9=87=8D=E5=91=BD=E5=90=8D?= =?UTF-8?q?=E4=B8=BAdispatcher=20=E6=98=8E=E7=A1=AE=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/work/xuye/common/db/entity/Task.java | 6 +-- .../work/xuye/common/db/entity/vo/Dispatcher.java | 63 ++++++++++++++++++++++ .../xuye/common/db/entity/vo/NextTaskFilter.java | 62 --------------------- .../work/xuye/source/handler/SourceHandler.java | 12 ++--- 4 files changed, 72 insertions(+), 71 deletions(-) create mode 100644 common/src/main/java/work/xuye/common/db/entity/vo/Dispatcher.java delete mode 100644 common/src/main/java/work/xuye/common/db/entity/vo/NextTaskFilter.java diff --git a/common/src/main/java/work/xuye/common/db/entity/Task.java b/common/src/main/java/work/xuye/common/db/entity/Task.java index 9173594..029c25b 100644 --- a/common/src/main/java/work/xuye/common/db/entity/Task.java +++ b/common/src/main/java/work/xuye/common/db/entity/Task.java @@ -9,7 +9,7 @@ import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; import lombok.Getter; import lombok.Setter; import lombok.experimental.Accessors; -import work.xuye.common.db.entity.vo.NextTaskFilter; +import work.xuye.common.db.entity.vo.Dispatcher; import work.xuye.common.db.entity.vo.ParseParams; import work.xuye.common.db.entity.vo.SinkParams; import work.xuye.common.db.entity.vo.TransformParams; @@ -59,8 +59,8 @@ public class Task implements Serializable { @TableField(value = "sink_params", typeHandler = JacksonTypeHandler.class) private SinkParams sinkParams; - @TableField(value = "next_filter", typeHandler = JacksonTypeHandler.class) - private NextTaskFilter nextFilter; + @TableField(value = "dispatcher", typeHandler = JacksonTypeHandler.class) + private Dispatcher dispatcher; @TableField(value = "create_time", fill = FieldFill.INSERT) @JsonDeserialize(using = LocalDateTimeDeserializer.class) diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/Dispatcher.java b/common/src/main/java/work/xuye/common/db/entity/vo/Dispatcher.java new file mode 100644 index 0000000..680fba2 --- /dev/null +++ b/common/src/main/java/work/xuye/common/db/entity/vo/Dispatcher.java @@ -0,0 +1,63 @@ +package work.xuye.common.db.entity.vo; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; +import java.util.Map; + +/** + * 分流设置 + * @author yechuan + * @since 2023/8/8 19:09 + **/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Dispatcher { + /** + * 参与计算的jsonPath + */ + private String filterArgs; + + /** + * break 表示命中后不再继续尝试匹配其他的节点 + */ + private String mode; + + /** + * 当前的唯一key,用于缓存判断是否变更 + */ + private String uniqueKey; + + /** + * 后继节点配置 + */ + private List nextNodes; + + /** + * spel表达式 + */ + private Map placeholderExpressions = Map.of(); + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class NextNodeConfig { + /** + * 消息往哪里个topic发 以通知后继任务执行 + */ + private String topic; + + /** + * 需要满足的条件 + */ + private String condition; + + /** + * 发送的对象 + */ + private String payload; + } +} diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/NextTaskFilter.java b/common/src/main/java/work/xuye/common/db/entity/vo/NextTaskFilter.java deleted file mode 100644 index a638c8f..0000000 --- a/common/src/main/java/work/xuye/common/db/entity/vo/NextTaskFilter.java +++ /dev/null @@ -1,62 +0,0 @@ -package work.xuye.common.db.entity.vo; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.List; -import java.util.Map; - -/** - * @author yechuan - * @since 2023/8/8 19:09 - **/ -@Data -@NoArgsConstructor -@AllArgsConstructor -public class NextTaskFilter { - /** - * 参与计算的jsonPath - */ - private String filterArgs; - - /** - * break 表示命中后不再继续尝试匹配其他的节点 - */ - private String mode; - - /** - * 当前的唯一key,用于缓存判断是否变更 - */ - private String uniqueKey; - - /** - * 后继节点配置 - */ - private List nextNodes; - - /** - * spel表达式 - */ - private Map placeholderExpressions = Map.of(); - - @Data - @NoArgsConstructor - @AllArgsConstructor - public static class NextNodeConfig { - /** - * 消息往哪里个topic发 以通知后继任务执行 - */ - private String topic; - - /** - * 需要满足的条件 - */ - private String condition; - - /** - * 发送的对象 - */ - private String payload; - } -} diff --git a/source/src/main/java/work/xuye/source/handler/SourceHandler.java b/source/src/main/java/work/xuye/source/handler/SourceHandler.java index a65bb5c..789ae80 100644 --- a/source/src/main/java/work/xuye/source/handler/SourceHandler.java +++ b/source/src/main/java/work/xuye/source/handler/SourceHandler.java @@ -15,7 +15,7 @@ import org.springframework.util.DigestUtils; import org.springframework.util.ObjectUtils; import work.xuye.common.constant.*; import work.xuye.common.db.entity.Task; -import work.xuye.common.db.entity.vo.NextTaskFilter; +import work.xuye.common.db.entity.vo.Dispatcher; import work.xuye.common.db.entity.vo.TaskMiniVO; import work.xuye.common.db.service.TaskManager; import work.xuye.common.dto.HttpRequestParams; @@ -120,7 +120,7 @@ public class SourceHandler { return null; } - if (Objects.nonNull(task.getNextFilter())) { + if (Objects.nonNull(task.getDispatcher())) { Map nextNodeMessage = buildNextNodeMessage(result, task, message); @@ -163,7 +163,7 @@ public class SourceHandler { } private Map buildNextNodeMessage(List result, Task task, Message message) { - NextTaskFilter filter = task.getNextFilter(); + Dispatcher filter = task.getDispatcher(); Map resultMap = new HashMap<>(); result.stream() @@ -174,7 +174,7 @@ public class SourceHandler { .headers(item.getHeaders()) .requestParams(item.getRequestParams()); httpRes.setResourceStatus(item.getResourceStatus()); - httpRes.setTemporary(JsonPathUtil.read(item.getBody(), task.getNextFilter().getFilterArgs())); + httpRes.setTemporary(JsonPathUtil.read(item.getBody(), task.getDispatcher().getFilterArgs())); return httpRes; }) .flatMap(item -> { @@ -210,7 +210,7 @@ public class SourceHandler { } // 节点命中 - List nextNodes = filter.getNextNodes(); + List nextNodes = filter.getNextNodes(); if (ObjectUtils.isEmpty(nextNodes)) { return; } @@ -218,7 +218,7 @@ public class SourceHandler { context.setVariable("filterArgs", map); context.setVariable("httpRes", httpRes); - for (NextTaskFilter.NextNodeConfig nextNode : nextNodes) { + for (Dispatcher.NextNodeConfig nextNode : nextNodes) { if (!Boolean.TRUE.equals(getPlaceholderSpel(nextNode.getCondition(), filter.getPlaceholderExpressions(), context, Boolean.class))) { continue;