diff --git a/common/pom.xml b/common/pom.xml index d108c42..50d9ee6 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 work.xuye @@ -120,6 +121,10 @@ sq-sentry 1.1.8 + + org.springframework.boot + spring-boot-starter-validation + diff --git a/common/src/main/java/work/xuye/common/alert/ExceptionAspect.java b/common/src/main/java/work/xuye/common/alert/ExceptionAspect.java index 9f81df3..30e3820 100644 --- a/common/src/main/java/work/xuye/common/alert/ExceptionAspect.java +++ b/common/src/main/java/work/xuye/common/alert/ExceptionAspect.java @@ -23,10 +23,10 @@ public class ExceptionAspect { private final MessageService messageService; - @AfterThrowing(pointcut = "execution(* work.xuye..*(..))", throwing = "ex") + @AfterThrowing(pointcut = "execution(* work.xuye..*(..))&& !execution(* work.xuye.common.service.MessageService.*(..))", throwing = "ex") public void handleGlobalException(Exception ex) { - - Sentry.capture(ex); + + List stackTraceElements = new java.util.ArrayList<>(List.of(ex.getStackTrace())); stackTraceElements.removeIf(stackTraceElement -> !stackTraceElement.getClassName().contains("work.xuye")); @@ -38,14 +38,32 @@ public class ExceptionAspect { .append(stackTraceElement.getClassName()).append(".") .append(stackTraceElement.getMethodName()).append("()").append(" (line ") .append(stackTraceElement.getLineNumber()).append(")\n")); + String traceStringString = traceString.toString(); + + if (traceStringString.contains("work.xuye.common.service.MessageService")){ + return; + } + + Sentry.capture(ex); + String subTitle = ex.getMessage(); String title = ex.getClass().getName(); if (subTitle == null) { subTitle = title; } - messageService.sendMessage(MessageType.exception, - title, - subTitle, - traceString.toString()); + + + for (int i = 1; i <= 3; i++) { + try { + messageService.sendMessage(MessageType.exception, + title, + subTitle, + traceStringString); + return; + } catch (Exception e) { + log.error("send error message error! now {} times ,try retry send again e:", i, e); + } + } + } } diff --git a/common/src/main/java/work/xuye/common/constant/BindingConstants.java b/common/src/main/java/work/xuye/common/constant/BindingConstants.java index 63d8fb6..86ffdca 100644 --- a/common/src/main/java/work/xuye/common/constant/BindingConstants.java +++ b/common/src/main/java/work/xuye/common/constant/BindingConstants.java @@ -8,6 +8,8 @@ public class BindingConstants { public static final String TASK_OUT = "task-out"; + public static final String SOURCE_OUT = "source-out"; + public static final String SNAPSHOT_OUT_POSTFIX = "-snapshot-out"; public static final String TRANSFORMER_KEYS_OUT = "transformKeys-out"; diff --git a/common/src/main/java/work/xuye/common/constant/MessageConstants.java b/common/src/main/java/work/xuye/common/constant/MessageConstants.java index 0ec13c9..da6ffb0 100644 --- a/common/src/main/java/work/xuye/common/constant/MessageConstants.java +++ b/common/src/main/java/work/xuye/common/constant/MessageConstants.java @@ -20,6 +20,7 @@ public class MessageConstants { public static final String TASK_NAME = "taskName"; public static final String SEED_URL = "seedUrl"; + public static final String OTHER_CACHE_KEY_SET = "otherCacheKeySet"; } diff --git a/common/src/main/java/work/xuye/common/db/entity/Task.java b/common/src/main/java/work/xuye/common/db/entity/Task.java index a9d37b0..029c25b 100644 --- a/common/src/main/java/work/xuye/common/db/entity/Task.java +++ b/common/src/main/java/work/xuye/common/db/entity/Task.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; import lombok.Getter; import lombok.Setter; import lombok.experimental.Accessors; +import work.xuye.common.db.entity.vo.Dispatcher; import work.xuye.common.db.entity.vo.ParseParams; import work.xuye.common.db.entity.vo.SinkParams; import work.xuye.common.db.entity.vo.TransformParams; @@ -58,6 +59,9 @@ public class Task implements Serializable { @TableField(value = "sink_params", typeHandler = JacksonTypeHandler.class) private SinkParams sinkParams; + @TableField(value = "dispatcher", typeHandler = JacksonTypeHandler.class) + private Dispatcher dispatcher; + @TableField(value = "create_time", fill = FieldFill.INSERT) @JsonDeserialize(using = LocalDateTimeDeserializer.class) @JsonSerialize(using = LocalDateTimeSerializer.class) @@ -68,6 +72,9 @@ public class Task implements Serializable { @JsonSerialize(using = LocalDateTimeSerializer.class) private LocalDateTime updateTime; + @TableField(value = "is_root") + private Boolean isRoot; + @TableField("version") @Version private Integer version; diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/Dispatcher.java b/common/src/main/java/work/xuye/common/db/entity/vo/Dispatcher.java new file mode 100644 index 0000000..680fba2 --- /dev/null +++ b/common/src/main/java/work/xuye/common/db/entity/vo/Dispatcher.java @@ -0,0 +1,63 @@ +package work.xuye.common.db.entity.vo; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; +import java.util.Map; + +/** + * 分流设置 + * @author yechuan + * @since 2023/8/8 19:09 + **/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Dispatcher { + /** + * 参与计算的jsonPath + */ + private String filterArgs; + + /** + * break 表示命中后不再继续尝试匹配其他的节点 + */ + private String mode; + + /** + * 当前的唯一key,用于缓存判断是否变更 + */ + private String uniqueKey; + + /** + * 后继节点配置 + */ + private List nextNodes; + + /** + * spel表达式 + */ + private Map placeholderExpressions = Map.of(); + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class NextNodeConfig { + /** + * 消息往哪里个topic发 以通知后继任务执行 + */ + private String topic; + + /** + * 需要满足的条件 + */ + private String condition; + + /** + * 发送的对象 + */ + private String payload; + } +} diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java b/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java index 86482d3..9515385 100644 --- a/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java +++ b/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java @@ -50,6 +50,10 @@ public class SinkParams { private Status status; private String fieldName; private String jsonPath; + /** + * 数据库对象转化为String的spel表达式 + */ + private String dbObjectExpression; } @Data diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java b/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java index 2601fe1..b6d8141 100644 --- a/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java +++ b/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java @@ -5,6 +5,7 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import work.xuye.common.enums.DbFieldType; +import work.xuye.common.enums.UpdateStrategy; import java.util.List; @@ -80,6 +81,11 @@ public class TableTemplate { * 字段校验表达式规则 */ private List rules; + + /** + * 修改时的策略 + */ + private UpdateStrategy updateStrategy = UpdateStrategy.ignored; } @Data diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java b/common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java index 50e2b26..fabfff5 100644 --- a/common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java +++ b/common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java @@ -5,6 +5,8 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import java.util.Map; + /** * @author xuye * @since 2023/7/7 21:58 @@ -18,4 +20,9 @@ public class TaskMiniVO { private Integer id; private String name; + + /** + * 用于存储父task节点发送的参数 子节点可能需要此参数进行计算 + */ + private Map args; } diff --git a/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java b/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java index bfc66ff..fa5364f 100644 --- a/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java +++ b/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java @@ -1,13 +1,22 @@ package work.xuye.common.dto; +import cn.hutool.core.collection.CollUtil; +import com.baomidou.mybatisplus.annotation.TableField; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import work.xuye.common.enums.RequestMode; +import work.xuye.common.enums.SignatureEnumGroup; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.Set; /** * @author xuye @@ -24,10 +33,63 @@ public class HttpRequestParams { private String url; private Map body = Map.of(); private String charset; - private Map headers = Map.of(); + private Map headers = Map.of(); private Map placeholderExpressions = Map.of(); private String mediaType = MediaType.APPLICATION_JSON_VALUE; + private RequestSignatureConfig signatureConfig; + + /** + * 仅计算一次的表达式(同次请求后续使用上次计算值) + */ + private Set onceCompileExpressions = Collections.emptySet(); + + /** + * 缓存的表达式值 + */ + @TableField(exist = false) + private Map placeholderExpressionsValueCache = new HashMap<>(); + + + /** + * 参数(由父任务传递,参与计算与组装) + */ + @TableField(exist = false) + private Map args = new HashMap<>(); + + + /** + * 根据占位符获取值 + */ + public T getPlaceholderValue(String placeholder, Class t, SpelExpressionParser parser, EvaluationContext context) { + if (CollUtil.isEmpty(placeholderExpressions)) { + return null; + } + if (!placeholderExpressions.containsKey(placeholder)) { + return null; + } + boolean cache = false; + + if (onceCompileExpressions.contains(placeholder)) { + // 一次请求中 仅计算一次 + if (placeholderExpressionsValueCache.containsKey(placeholder)) { + return t.cast(placeholderExpressionsValueCache.get(placeholder)); + } + cache = true; + } + context.setVariable("requestParams", this); + String expr = placeholderExpressions.get(placeholder); + T result = parser.parseExpression(expr).getValue(context, t); + + if (cache) { + placeholderExpressionsValueCache.put(placeholder, result); + } + + return result; + + + } + @Data @NoArgsConstructor @AllArgsConstructor @@ -36,4 +98,13 @@ public class HttpRequestParams { } + @Data + @NoArgsConstructor + @AllArgsConstructor + @EqualsAndHashCode(callSuper = true) + public static class RequestSignatureConfig extends SignatureConfig { + private String signatureField; + private String signatureContext; + private SignatureEnumGroup.SignatureLocation signatureLocation; + } } diff --git a/common/src/main/java/work/xuye/common/dto/HttpRes.java b/common/src/main/java/work/xuye/common/dto/HttpRes.java index e2f8c1d..cbad376 100644 --- a/common/src/main/java/work/xuye/common/dto/HttpRes.java +++ b/common/src/main/java/work/xuye/common/dto/HttpRes.java @@ -1,5 +1,6 @@ package work.xuye.common.dto; +import com.fasterxml.jackson.annotation.JsonInclude; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -25,6 +26,12 @@ public class HttpRes { private Map> headers = Map.of(); private Long costTimeMillis; + /** + * 临时变量 + */ + @JsonInclude + private Object temporary; + public static HttpRes build() { return new HttpRes(); } diff --git a/common/src/main/java/work/xuye/common/dto/SignatureConfig.java b/common/src/main/java/work/xuye/common/dto/SignatureConfig.java new file mode 100644 index 0000000..a60081e --- /dev/null +++ b/common/src/main/java/work/xuye/common/dto/SignatureConfig.java @@ -0,0 +1,36 @@ +package work.xuye.common.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 加解密配置 + * + * @author yechuan + * @since 2023/8/8 14:44 + **/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class SignatureConfig { + /** + * key 算法 + */ + private String keyAlgorithm; + + /** + * 公私钥类型 + */ + private String keyType; + + /** + * 加密算法 + */ + private String signatureAlgorithm; + + /** + * key + */ + private String key; +} diff --git a/common/src/main/java/work/xuye/common/dto/SignatureContext.java b/common/src/main/java/work/xuye/common/dto/SignatureContext.java new file mode 100644 index 0000000..8b917a2 --- /dev/null +++ b/common/src/main/java/work/xuye/common/dto/SignatureContext.java @@ -0,0 +1,18 @@ +package work.xuye.common.dto; + +import lombok.Data; + +import java.security.PrivateKey; +import java.security.PublicKey; +import java.security.Signature; + +/** + * @author yechuan + * @since 2023/8/9 17:53 + **/ +@Data +public class SignatureContext { + private PrivateKey priKey; + private PublicKey pubKey; + private Signature signature; +} diff --git a/common/src/main/java/work/xuye/common/enums/SignatureEnumGroup.java b/common/src/main/java/work/xuye/common/enums/SignatureEnumGroup.java new file mode 100644 index 0000000..8de5f94 --- /dev/null +++ b/common/src/main/java/work/xuye/common/enums/SignatureEnumGroup.java @@ -0,0 +1,36 @@ +package work.xuye.common.enums; + +/** + * @author yechuan + * @since 2023/8/8 14:52 + **/ +public class SignatureEnumGroup { + + /** + * 加密字符串的位置 + */ + public static enum SignatureLocation{ + header + } + + public static enum KeyAlgorithm{ + /** + * RSA + */ + RSA + } + + public static enum KeyType{ + /** + * 私钥 + */ + privateKey, + /** + * 公钥 + */ + publicKey, + } + + + +} diff --git a/common/src/main/java/work/xuye/common/enums/UpdateStrategy.java b/common/src/main/java/work/xuye/common/enums/UpdateStrategy.java new file mode 100644 index 0000000..e10d026 --- /dev/null +++ b/common/src/main/java/work/xuye/common/enums/UpdateStrategy.java @@ -0,0 +1,18 @@ +package work.xuye.common.enums; + +/** + * 修改时的策略 + * + * @author yechuan + * @since 2023/8/10 13:37 + **/ +public enum UpdateStrategy { + /** + * 忽略判断 + */ + ignored, + /** + * 非NULL时加入sql + */ + not_null +} diff --git a/common/src/main/java/work/xuye/common/service/MessageService.java b/common/src/main/java/work/xuye/common/service/MessageService.java index a7ded8d..0f94e13 100644 --- a/common/src/main/java/work/xuye/common/service/MessageService.java +++ b/common/src/main/java/work/xuye/common/service/MessageService.java @@ -58,7 +58,7 @@ public class MessageService { .send(); messageMD5Store.save(result.toString()); } else { - log.debug("短期内已经发送过相同的异常信息,不再发送: [{}]", result); + log.info("短期内已经发送过相同的异常信息,不再发送: [{}]", result); } } } diff --git a/common/src/main/java/work/xuye/common/service/UrlMD5Service.java b/common/src/main/java/work/xuye/common/service/UrlMD5Service.java index b82a98c..99d6a77 100644 --- a/common/src/main/java/work/xuye/common/service/UrlMD5Service.java +++ b/common/src/main/java/work/xuye/common/service/UrlMD5Service.java @@ -2,10 +2,16 @@ package work.xuye.common.service; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.messaging.Message; import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; +import org.springframework.util.StringUtils; +import work.xuye.common.constant.MessageConstants; import work.xuye.common.enums.ResourceStatus; import work.xuye.common.store.UrlMD5MapStore; +import java.util.Set; + /** * @author xuye * @since 2023/3/6 21:58 @@ -37,4 +43,15 @@ public class UrlMD5Service { Long aLong = urlMD5MapStore.removeKey(seedUrl); log.info("删除url对应的md5,url:{}, 删除条数:{}", seedUrl, aLong); } + + public void removeUrlCache(Message message) { + Set parentOtherCacheKeySet = message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class); + if (!ObjectUtils.isEmpty(parentOtherCacheKeySet)) { + parentOtherCacheKeySet.forEach(this::removeUrlMD5); + } + String sendUrl = message.getHeaders().get(MessageConstants.SEED_URL, String.class); + if (StringUtils.hasText(sendUrl)) { + this.removeUrlMD5(sendUrl); + } + } } diff --git a/common/src/main/java/work/xuye/common/spel/CustomFunction.java b/common/src/main/java/work/xuye/common/spel/CustomFunction.java index 43e09f0..5653fee 100644 --- a/common/src/main/java/work/xuye/common/spel/CustomFunction.java +++ b/common/src/main/java/work/xuye/common/spel/CustomFunction.java @@ -4,6 +4,8 @@ import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.PathNotFoundException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.expression.Expression; @@ -41,6 +43,13 @@ public class CustomFunction { private final NsKVMapStore mapStore; + public String localDateTimeFormat(LocalDateTime time, String pattern) { + if (time == null) { + return ""; + } + return time.format(DateTimeFormatter.ofPattern(pattern)); + } + public String jsonExtract(String json, String path) { String result = null; try { @@ -53,6 +62,20 @@ public class CustomFunction { return result; } + public String jsonExtractOrPathNotFound(String json, String path) { + String result = null; + try { + result = JsonPath.read(json, path).toString().trim(); + } catch (PathNotFoundException e) { + log.warn("jsonPath not find, json: [{}], path: [{}]", json, path); + } catch (Exception e) { + log.error("jsonExtra handler error, json: [{}], path: [{}]", json, path); + throw new RuntimeException(e); + } + log.debug("jsonExtra handler, json: [{}], path: [{}], result: [{}]", json, path, result); + return result; + } + public String jsonExtractReplaceAll(String json, String jsonPath, String regex, String replacement) { return this.jsonExtract(json, jsonPath).replaceAll(regex, replacement).trim(); } diff --git a/common/src/main/java/work/xuye/common/utils/signature/AbstractSignatureService.java b/common/src/main/java/work/xuye/common/utils/signature/AbstractSignatureService.java new file mode 100644 index 0000000..66225f5 --- /dev/null +++ b/common/src/main/java/work/xuye/common/utils/signature/AbstractSignatureService.java @@ -0,0 +1,32 @@ +package work.xuye.common.utils.signature; + +import work.xuye.common.dto.SignatureConfig; +import work.xuye.common.dto.SignatureContext; + +import java.security.GeneralSecurityException; +import java.security.Signature; + +/** + * @author yechuan + * @since 2023/8/8 14:42 + **/ +public abstract class AbstractSignatureService { + + protected SignatureContext signatureContext; + + /** + * 使用私钥加密 + * + * @param content + * @return + * @throws GeneralSecurityException + */ + public abstract String signatureByPriKey(String content) throws GeneralSecurityException; + + + void init(SignatureConfig signatureConfig) throws GeneralSecurityException { + signatureContext = new SignatureContext(); + signatureContext.setSignature(Signature.getInstance(signatureConfig.getSignatureAlgorithm())); + } + +} diff --git a/common/src/main/java/work/xuye/common/utils/signature/RSASignatureService.java b/common/src/main/java/work/xuye/common/utils/signature/RSASignatureService.java new file mode 100644 index 0000000..7f96922 --- /dev/null +++ b/common/src/main/java/work/xuye/common/utils/signature/RSASignatureService.java @@ -0,0 +1,70 @@ +package work.xuye.common.utils.signature; + +import org.apache.commons.codec.binary.Base64; +import org.springframework.util.DigestUtils; +import work.xuye.common.dto.SignatureConfig; +import work.xuye.common.enums.SignatureEnumGroup; + +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; +import java.security.KeyFactory; +import java.security.Signature; +import java.security.spec.PKCS8EncodedKeySpec; +import java.security.spec.X509EncodedKeySpec; + +/** + * RSA + * + * @author yechuan + * @since 2023/8/8 14:47 + **/ + +class RSASignatureService extends AbstractSignatureService { + + private RSASignatureService() { + + } + + public static AbstractSignatureService getInstance(SignatureConfig signatureConfig) throws GeneralSecurityException { + RSASignatureService result = new RSASignatureService(); + result.init(signatureConfig); + return result; + } + + @Override + public String signatureByPriKey(String content) throws GeneralSecurityException { + if (signatureContext == null) { + throw new RuntimeException("未初始化公私钥对信息"); + } + if (signatureContext.getSignature() == null) { + throw new RuntimeException("未初始化私钥信息"); + } + Signature signature = signatureContext.getSignature(); + content = DigestUtils.md5DigestAsHex(content.getBytes(StandardCharsets.UTF_8)).toUpperCase(); + signature.update(content.getBytes(StandardCharsets.UTF_8)); + return Base64.encodeBase64String(signature.sign()); + } + + @Override + void init(SignatureConfig signatureConfig) throws GeneralSecurityException { + super.init(signatureConfig); + + String keyTypeName = signatureConfig.getKeyType(); + SignatureEnumGroup.KeyType keyType = SignatureEnumGroup.KeyType.valueOf(keyTypeName); + + KeyFactory keyFactory = KeyFactory.getInstance(SignatureEnumGroup.KeyAlgorithm.RSA.name()); + + byte[] encodedKey = Base64.decodeBase64(signatureConfig.getKey()); + + if (SignatureEnumGroup.KeyType.privateKey.equals(keyType)) { + signatureContext.setPriKey(keyFactory.generatePrivate(new PKCS8EncodedKeySpec(encodedKey))); + signatureContext.getSignature().initSign(signatureContext.getPriKey()); + } else if (SignatureEnumGroup.KeyType.publicKey.equals(keyType)) { + signatureContext.setPubKey(keyFactory.generatePublic(new X509EncodedKeySpec(encodedKey))); + } + + + } + + +} diff --git a/common/src/main/java/work/xuye/common/utils/signature/SignatureFactory.java b/common/src/main/java/work/xuye/common/utils/signature/SignatureFactory.java new file mode 100644 index 0000000..8f25643 --- /dev/null +++ b/common/src/main/java/work/xuye/common/utils/signature/SignatureFactory.java @@ -0,0 +1,33 @@ +package work.xuye.common.utils.signature; + +import work.xuye.common.dto.SignatureConfig; +import work.xuye.common.enums.SignatureEnumGroup; + +import java.security.GeneralSecurityException; + +/** + * @author yechuan + * @since 2023/8/8 15:20 + **/ +public class SignatureFactory { + + /** + * 获取加解密实现 + */ + public static AbstractSignatureService getSignatureService(SignatureConfig signatureConfig) throws GeneralSecurityException { + + AbstractSignatureService result = null; + + switch (SignatureEnumGroup.KeyAlgorithm.valueOf(signatureConfig.getKeyAlgorithm())) { + case RSA: + result = RSASignatureService.getInstance(signatureConfig); + break; + } + if (result == null) { + throw new RuntimeException(String.format("不支持的签名方式,signatureConfig:%s", signatureConfig)); + } + return result; + + } + +} diff --git a/common/src/main/resources/markdown/task.md b/common/src/main/resources/markdown/task.md index bd49f03..2b830a7 100644 --- a/common/src/main/resources/markdown/task.md +++ b/common/src/main/resources/markdown/task.md @@ -37,4 +37,10 @@ ## 基金宝图文、视频RSS 1. 对方给的接口数据格式比较乱,让对方改了多次才勉强能用 -2. 视频暂时只有一条数据,找媒体方沟通多次依旧只能提供一条 \ No newline at end of file +2. 视频暂时只有一条数据,找媒体方沟通多次依旧只能提供一条 + +## 第一财经视频RSS + +1. 请求需要进行加密 新增加密相关配置 +2. 列表接口存在上下架状态,对于下架的视频 无法请求详情接口,需要伪造详情数据以便执行后续流程 +3. 接口请求存在顺序关系,新增配置,让task允许存在多个后继task且可配置制定发送topic \ No newline at end of file diff --git a/helper/src/main/java/work/xuye/helper/checker/DataDiffChecker.java b/helper/src/main/java/work/xuye/helper/checker/DataDiffChecker.java new file mode 100644 index 0000000..4067cd1 --- /dev/null +++ b/helper/src/main/java/work/xuye/helper/checker/DataDiffChecker.java @@ -0,0 +1,180 @@ +package work.xuye.helper.checker; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; +import work.xuye.common.db.service.TaskManager; +import work.xuye.helper.properties.HelperProperties; +import work.xuye.helper.service.JdbcTemplateManager; + +import java.util.*; + +/** + * @author xuye + * @since 2023/7/13 14:57 + **/ +@Slf4j +@Service +@RequiredArgsConstructor +public class DataDiffChecker { + + private final TaskManager taskManager; + private final JdbcTemplateManager jdbcTemplateManager; + private final HelperProperties helperProperties; + + + public Map> checkDifferentKeys(Map map1, Map map2) { + Map> map = new HashMap<>(); + for (String key : map1.keySet()) { + if (map2.containsKey(key)) { + Object value1 = map1.get(key); + Object value2 = map2.get(key); + + if (!ObjectUtils.nullSafeEquals(value1, value2)) { + if (!helperProperties.getEqualsIgnoreField().contains(key)) { + map.put(key, List.of(value1, value2)); + log.debug("@@@ key: {}, value1: {}, value2: {}", key, value1, value2); + } + } + } else { + log.debug("@@@ key: {}, value1: {}, value2: {}", key, map1.get(key), null); + ArrayList list = new ArrayList<>(); + list.add(map1.get(key)); + list.add(null); + map.put(key, list); + } + } + for (String key : map2.keySet()) { + if (!map1.containsKey(key)) { + log.debug("@@@ key: {}, value1: {}, value2: {}", key, null, map2.get(key)); + ArrayList list = new ArrayList<>(); + list.add(null); + list.add(map2.get(key)); + map.put(key, list); + } + } + return map; + } + + + public void check(List ids, Integer intervalDays) { + List newsIds = helperProperties.getTaskTypeIdMap().get(NewsTypeEnum.NEWS); + List videoIds = helperProperties.getTaskTypeIdMap().get(NewsTypeEnum.VIDEO); + if (ObjectUtils.isEmpty(ids)) { + handleNews(newsIds, intervalDays); +// handleVideo(videoIds); + } else { + for (Integer id : ids) { + if (newsIds.contains(id)) { + handleNews(List.of(id), intervalDays); + } else if (videoIds.contains(id)) { +// handleVideo(List.of(id)); + } + } + } + } + + public void handleNews(List ids, Integer intervalDays) { + + String source = helperProperties.getSourceDbName(); + String target = helperProperties.getTargetDbName(); + + String sql = "select url from all_news where spider = ? and createDate > CURDATE() - INTERVAL ? DAY order by createDate desc"; + for (Integer id : ids) { + String taskName = taskManager.getTaskInfoByTaskId(id).getTask().getName(); + log.info("-------------------------{}-------------------------", taskName); + List sourceUrl = jdbcTemplateManager.getTemplate(source).queryForList(sql, String.class, taskName, intervalDays); + List targetUrl = jdbcTemplateManager.getTemplate(target).queryForList(sql, String.class, taskName, intervalDays); + + // 判断交集数据 + List intersection = new ArrayList<>(sourceUrl); + intersection.retainAll(targetUrl); + log.info("@@@ [{}] 记录数: [{}], [{}] 记录数: [{}], 交集条数: [{}]", source, sourceUrl.size(), target, targetUrl.size(), intersection.size()); + + // 以 source 为基准,判断差集数据 + List diff = findDifference(sourceUrl, intersection); + if (diff.size() > 0) { + log.info("@@@ [{}] 有,但是 [{}] 没有的记录数: {}", source, target, diff.size()); + checkDiffReason(diff, jdbcTemplateManager.getTemplate(target)); + } + + + // 以 target 为基准,判断差集数据 + diff = findDifference(targetUrl, intersection); + if (diff.size() > 0) { + log.info("@@@ [{}] 有,但是 [{}] 没有的记录数: {}", target, source, diff.size()); + checkDiffReason(diff, jdbcTemplateManager.getTemplate(source)); + } + + + // 判断交集数据是否相同 + findFieldsDiff(intersection, jdbcTemplateManager.getTemplate(source), jdbcTemplateManager.getTemplate(target)); + } + } + + private void findFieldsDiff(List urls, JdbcTemplate jdbcTemplate, JdbcTemplate jdbcTemplate2) { + int count = 0; + Map>> result = new HashMap<>(); + String sql = "select * from all_news where url = ? limit 1"; + for (String s : urls) { + if (urls.size() > helperProperties.getThreshold() && Math.random() > helperProperties.getSampleRate()) { + continue; + } + count++; + Map map = jdbcTemplate.queryForMap(sql, s); + Map map2 = jdbcTemplate2.queryForMap(sql, s); + Map> differentKeys = checkDifferentKeys(map, map2); + if (differentKeys.size() > 0) { + result.put(s, differentKeys); + } + } + if (ObjectUtils.isEmpty(result)) { + log.info("@@@ 在 [{}] 条中抽检了 [{}] 条,没有字段差异", urls.size(), count); + } else { + + Set keys = new HashSet<>(); + for (Map> value : result.values()) { + keys.addAll(value.keySet()); + } + log.warn("@@@ 共在 [{}] 条中抽检了 [{}] 条,有 [{}] 条有字段差异, 差异字段: {}", urls.size(), count, result.size(), keys); + } + } + + public List findDifference(List source, List intersection) { + List diff = new ArrayList<>(source); + diff.removeAll(intersection); + return diff; + } + + public void checkDiffReason(List urls, JdbcTemplate jdbcTemplate) { + List occupied = new ArrayList<>(); + List notExist = new ArrayList<>(); + + String sql = "select spider from all_news where url = ?"; + for (String url : urls) { + List spiders = jdbcTemplate.queryForList(sql, String.class, url); + if (spiders.size() == 1) { + occupied.add(url); + } else { + notExist.add(url); + } + } + + if (!ObjectUtils.isEmpty(occupied)) { + log.info("@@ 已经被其他爬虫提前爬取的记录条数`: {}", occupied.size()); + } + if (!ObjectUtils.isEmpty(notExist)) { + if (notExist.size() > 5) { + log.error("@@ ⚠️不存在的记录条数: {}", notExist.size()); + } else { + log.warn("@@ 不存在的记录条数: {}, url: {}", notExist.size(), notExist); + } + } + + + } + + +} diff --git a/helper/src/main/java/work/xuye/helper/checker/NewsTypeEnum.java b/helper/src/main/java/work/xuye/helper/checker/NewsTypeEnum.java new file mode 100644 index 0000000..ac2fcce --- /dev/null +++ b/helper/src/main/java/work/xuye/helper/checker/NewsTypeEnum.java @@ -0,0 +1,14 @@ +package work.xuye.helper.checker; + + +/** + * @author xuye + * @since 2023/3/4 + **/ + +public enum NewsTypeEnum { + + NEWS, + VIDEO +} + diff --git a/helper/src/main/java/work/xuye/helper/controller/CheckController.java b/helper/src/main/java/work/xuye/helper/controller/CheckController.java new file mode 100644 index 0000000..8a8c5f5 --- /dev/null +++ b/helper/src/main/java/work/xuye/helper/controller/CheckController.java @@ -0,0 +1,35 @@ +package work.xuye.helper.controller; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.RequiredArgsConstructor; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import work.xuye.helper.checker.DataDiffChecker; +import work.xuye.helper.vo.DataCheckVO; + +import javax.validation.Valid; + +/** + * @author xuye + * @since 2023/7/13 18:13 + **/ +@Api(tags = "数据对比") +@Validated +@RestController +@RequestMapping("/check") +@RequiredArgsConstructor +public class CheckController { + + private final DataDiffChecker dataDiffChecker; + + @PostMapping + @ApiOperation("指定任务对比") + public void check(@RequestBody @Valid DataCheckVO dataCheckVO) { + dataDiffChecker.check(dataCheckVO.getIds(), dataCheckVO.getIntervalDays()); + } + +} diff --git a/helper/src/main/java/work/xuye/helper/properties/DatasourceConfigProperties.java b/helper/src/main/java/work/xuye/helper/properties/DatasourceConfigProperties.java new file mode 100644 index 0000000..4aa0014 --- /dev/null +++ b/helper/src/main/java/work/xuye/helper/properties/DatasourceConfigProperties.java @@ -0,0 +1,28 @@ +package work.xuye.helper.properties; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * @author xuye + * @since 2023/1/14 21:50 + **/ +@Data +@Component +@ConfigurationProperties(prefix = DatasourceConfigProperties.PROPERTIES_PREFIX) +public class DatasourceConfigProperties { + + public static final String PROPERTIES_PREFIX = "datasource"; + + private Map configMap; + + @Data + public static class DataSource { + private String url; + private String username; + private String password; + } +} diff --git a/helper/src/main/java/work/xuye/helper/properties/HelperProperties.java b/helper/src/main/java/work/xuye/helper/properties/HelperProperties.java index e91b50e..9ec8ae7 100644 --- a/helper/src/main/java/work/xuye/helper/properties/HelperProperties.java +++ b/helper/src/main/java/work/xuye/helper/properties/HelperProperties.java @@ -1,28 +1,46 @@ package work.xuye.helper.properties; import lombok.Data; +import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; +import work.xuye.helper.checker.NewsTypeEnum; +import java.util.List; import java.util.Map; +import java.util.Set; /** * @author xuye * @since 2023/1/14 21:50 **/ +@Slf4j @Data @Component @ConfigurationProperties(prefix = HelperProperties.PROPERTIES_PREFIX) public class HelperProperties { + public static final String PROPERTIES_PREFIX = "helper"; + private String sourceDbName; + private String targetDbName; + private Map> taskTypeIdMap; + + /** + * 对比字段时,忽略的字段 + */ + private Set equalsIgnoreField; + + + /** + * 触发抽检的阈值 + */ + private int threshold = 1000; + + /** + * 抽检的比例 + */ + private double sampleRate = 0.1; - private Map datasourceMap; - @Data - public static class DataSource { - private String url; - private String username; - private String password; - } } diff --git a/helper/src/main/java/work/xuye/helper/service/JdbcTemplateManager.java b/helper/src/main/java/work/xuye/helper/service/JdbcTemplateManager.java new file mode 100644 index 0000000..05913d3 --- /dev/null +++ b/helper/src/main/java/work/xuye/helper/service/JdbcTemplateManager.java @@ -0,0 +1,44 @@ +package work.xuye.helper.service; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Service; +import work.xuye.helper.properties.DatasourceConfigProperties; + +import javax.annotation.PostConstruct; +import java.util.HashMap; + +/** + * @author xuye + * @since 2023/7/13 14:04 + **/ +@Slf4j +@Service +@RequiredArgsConstructor +public class JdbcTemplateManager { + + + private final DatasourceConfigProperties datasourceConfigProperties; + + private final HashMap jdbcTemplateMap = new HashMap<>(); + + public JdbcTemplate getTemplate(String key) { + return jdbcTemplateMap.get(key); + } + + @PostConstruct + private void initJdbcTemplate() { + jdbcTemplateMap.clear(); + datasourceConfigProperties.getConfigMap().forEach((key, source) -> { + HikariConfig config = new HikariConfig(); + config.setJdbcUrl(source.getUrl()); + config.setUsername(source.getUsername()); + config.setPassword(source.getPassword()); + HikariDataSource dataSource = new HikariDataSource(config); + jdbcTemplateMap.put(key, new JdbcTemplate(dataSource)); + }); + } +} diff --git a/helper/src/main/java/work/xuye/helper/service/UpdateService.java b/helper/src/main/java/work/xuye/helper/service/UpdateService.java deleted file mode 100644 index 5f3b138..0000000 --- a/helper/src/main/java/work/xuye/helper/service/UpdateService.java +++ /dev/null @@ -1,171 +0,0 @@ -package work.xuye.helper.service; - -import cn.hutool.db.Db; -import cn.hutool.db.Entity; -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.stereotype.Service; -import org.springframework.util.ObjectUtils; -import work.xuye.helper.properties.HelperProperties; - -import javax.annotation.PostConstruct; -import java.sql.SQLException; -import java.util.*; - -/** - * @author xuye - * @since 2023/7/12 14:55 - **/ -@Slf4j -@Service -@RequiredArgsConstructor -public class UpdateService implements ApplicationRunner { - - private final HelperProperties helperProperties; - - private final HashMap dataSourceMap; - - @PostConstruct - public void init() { - - Map datasourceConfigMap = helperProperties.getDatasourceMap(); - for (String key : datasourceConfigMap.keySet()) { - HelperProperties.DataSource source = datasourceConfigMap.get(key); - HikariConfig config = new HikariConfig(); - config.setJdbcUrl(source.getUrl()); - config.setUsername(source.getUsername()); - config.setPassword(source.getPassword()); - HikariDataSource dataSource = new HikariDataSource(config); - dataSourceMap.put(key, dataSource); - - - } - } - - public void start() throws SQLException { - String sql = "select date(createDate) as date, spider, count(*) as count\n" + - "from all_news\n" + - "where JSON_EXTRACT(raw_data, '$.crawler') = 'snp'\n" + - " and createDate > curdate() - interval 1 day\n" + - "group by spider, date\n" + - "order by date desc, count desc;"; - - - Set keys = dataSourceMap.keySet(); - - HashMap> map = new HashMap<>(); - - for (String key : keys) { - HikariDataSource dataSource = dataSourceMap.get(key); - List query = Db.use(dataSourceMap.get(key)).query(sql); - map.put(key, query); - } - - // Compare List between different environments - for (String key1 : keys) { - for (String key2 : keys) { - if (!key1.equals(key2)) { - List list1 = map.get(key1); - List list2 = map.get(key2); - - for (Entity entity1 : list1) { - for (Entity entity2 : list2) { - String date1 = entity1.getStr("date"); - String spider1 = entity1.getStr("spider"); - int count1 = entity1.getInt("count"); - - String date2 = entity2.getStr("date"); - String spider2 = entity2.getStr("spider"); - int count2 = entity2.getInt("count"); - - if (date1.equals(date2) && spider1.equals(spider2) && count1 != count2) { - int diff = Math.abs(count1 - count2); - String more = count1 > count2 ? key1 : key2; - System.out.printf("@@@ [%s] [%s] %s: %d %s: %d, diff abs: %d, more: %s%n", date1, spider1, key1, count1, key2, count2, diff, more); - this.findDIff(spider1, date1); - } - } - } - } - } - // todo 此处比较多个的话有遗漏,后续优化下 - break; - } - } - - - public void findDIff(String spider, String date) throws SQLException { - HashMap> map = new HashMap<>(); - for (String key : dataSourceMap.keySet()) { - HikariDataSource dataSource = dataSourceMap.get(key); - - String sql = "SELECT url FROM all_news WHERE spider = '" + spider + "' AND date(createDate) = '" + date + "';"; - List resultList = Db.use(dataSource).query(sql); - ArrayList list = new ArrayList<>(); - for (Entity entity : resultList) { - String url = entity.getStr("url"); - list.add(url); - } - map.put(key, list); - } - - for (String s : map.keySet()) { - this.compareEnvironments(map, s); - } - - - } - - public void compareEnvironments(HashMap> map, String environment) throws SQLException { - - List environmentList = map.get(environment); - if (environmentList == null) { - return; - } - for (String key : map.keySet()) { - if (key.equals(environment)) { - continue; - } - List list = map.getOrDefault(key, new ArrayList<>()); - List intersection = new ArrayList<>(list); - intersection.retainAll(environmentList); - - List different = new ArrayList<>(list); - different.removeAll(intersection); - - if (!different.isEmpty()) { - System.out.println("@@ 环境 " + environment + " 中,List " + key + " 中独有的元素有" + different.size() + "个"); - for (String s : different) { - getStatusByUrl(s); - } - } - } - } - - public void getStatusByUrl(String url) throws SQLException { - String sql = "select spider from all_news where url='" + url + "';"; - for (String key : dataSourceMap.keySet()) { - HikariDataSource dataSource = dataSourceMap.get(key); - List resultList = Db.use(dataSource).query(sql); - if (ObjectUtils.isEmpty(resultList)) { - System.err.println("⚠️ url: " + url + " not found in " + key); - } - if (resultList.size() == 1) { - System.out.println("url [" + url + "] 被 [" + resultList.get(0).get("spider") + "] 提前抓取了"); - } - if (resultList.size() > 1) { - System.err.println("!!! url: " + url + " found in " + key + " " + resultList.size() + " times"); - } - - } - } - - @Override - public void run(ApplicationArguments args) throws Exception { - this.start(); - } -} diff --git a/helper/src/main/java/work/xuye/helper/vo/DataCheckVO.java b/helper/src/main/java/work/xuye/helper/vo/DataCheckVO.java new file mode 100644 index 0000000..e56ac0f --- /dev/null +++ b/helper/src/main/java/work/xuye/helper/vo/DataCheckVO.java @@ -0,0 +1,24 @@ +package work.xuye.helper.vo; + +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author xuye + * @since 2023/7/14 18:41 + **/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class DataCheckVO { + + @ApiModelProperty("任务id") + private List ids; + + @ApiModelProperty("最近几天的数据") + private Integer intervalDays; +} diff --git a/helper/src/main/resources/application.yml b/helper/src/main/resources/application.yml index 30d5b4d..1210186 100644 --- a/helper/src/main/resources/application.yml +++ b/helper/src/main/resources/application.yml @@ -1,15 +1,48 @@ knife4j: enable: true server: - port: 9300 -helper: - datasourceMap: - sit: + port: 9301 +datasource: + configMap: + news-sit: url: jdbc:mysql://47.116.58.10:3306/pyspider_sit_resultdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8 username: pyspider_user password: ND0328qSywfre - prd: + news-prd: url: jdbc:mysql://47.103.55.230:3306/pyspider_resultdb?characterEncoding=utf8&autoReconnect=true&useUnicode=true&useSSL=false username: pyspider password: strzsJQWp%uw9oKB - + video-prd: + url: jdbc:mysql://47.116.61.180:3306/fhl_data_ingestion?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8 + username: fhl_owner + password: 5rDjEXN%bAq5Bf#9 +sq: + sentry: + enabled: false +helper: + equals-ignore-field: + - id + - createDate + - postDate + - update_time + task-type-id-map: + news: + - 1 + - 2 + - 3 + - 4 + - 5 + - 7 + - 8 + - 9 + - 14 + - 15 + - 16 + - 17 + video: + - 6 + - 10 + - 11 + - 12 + source-db-name: news-sit + target-db-name: news-prd \ No newline at end of file diff --git a/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java b/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java index 04e6c4b..0b66cdd 100644 --- a/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java +++ b/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java @@ -1,5 +1,6 @@ package work.xuye.scheduler.service; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; @@ -73,7 +74,8 @@ public class IssueService implements ApplicationRunner { public ArrayList issueAllTask() { log.info("--------------------"); - List list = taskService.list(); +// List list = taskService.list(); + List list = taskService.list(new LambdaQueryWrapper().eq(Task::getIsRoot,true)); Collections.shuffle(list); ArrayList result = new ArrayList<>(); list.forEach(task -> { diff --git a/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java b/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java index d339a5d..7a0149f 100644 --- a/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java +++ b/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java @@ -10,6 +10,7 @@ import org.springframework.util.DigestUtils; import org.springframework.util.ObjectUtils; import work.xuye.common.constant.MessageConstants; import work.xuye.common.constant.StageConstants; +import work.xuye.common.db.entity.Mapping; import work.xuye.common.db.entity.Task; import work.xuye.common.db.entity.vo.SinkParams; import work.xuye.common.db.entity.vo.TableTemplate; @@ -22,10 +23,7 @@ import work.xuye.common.spel.SpringExpressionLanguageEvaluator; import work.xuye.common.utils.DebugUtil; import work.xuye.sink.service.JdbcService; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** * @author xuye @@ -64,8 +62,7 @@ public class ItemHandler { } private void cleanCache(Message message) { - String seedUrl = (String) message.getHeaders().get(MessageConstants.SEED_URL); - urlMD5Service.removeUrlMD5(seedUrl); + urlMD5Service.removeUrlCache(message); } @@ -87,7 +84,9 @@ public class ItemHandler { SinkParams.CheckDelete checkDeleteConfig = taskVO.getTask().getSinkParams().getCheckDelete(); // 根据消息体,和映射关系,使用SpEL表达式,得到计算结果 - HashMap expressionResultMap = evaluator.evaluate(taskVO.getTemplateMappingMap().get(templateName).getTableMapping(), message); + Map templateMappingMap = taskVO.getTemplateMappingMap(); + Mapping mapping = templateMappingMap.get(templateName); + HashMap expressionResultMap = evaluator.evaluate(mapping.getTableMapping(), message); // 如果不能通过校验,则抛出异常 this.validate(expressionResultMap, template); // 将计算结果合并到模板中,此刻的模板是拥有值的 @@ -319,8 +318,10 @@ public class ItemHandler { if (!ObjectUtils.isEmpty(checkUpdateConfig.getJsonPath())) { dbValue = cf.jsonExtract(dbValue.toString(), checkUpdateConfig.getJsonPath()); nowValue = cf.jsonExtract(nowValue.toString(), checkUpdateConfig.getJsonPath()); + } else if (!ObjectUtils.isEmpty(checkUpdateConfig.getDbObjectExpression())) { + dbValue = evaluator.evaluate(dbValue, checkUpdateConfig.getDbObjectExpression(), String.class); } else { - throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是没有配置jsonPath"); + throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是没有配置jsonPath与dbObjectExpression"); } if (dbValue.equals(nowValue)) { log.info("[{}][✅ 已是最新][{} {}] " + "{}: {}", diff --git a/sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java b/sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java index b801857..1cfac60 100644 --- a/sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java +++ b/sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java @@ -2,6 +2,7 @@ package work.xuye.sink.handler; import work.xuye.common.db.entity.vo.TableTemplate; import work.xuye.common.enums.DbFieldType; +import work.xuye.common.enums.UpdateStrategy; import java.util.ArrayList; import java.util.List; @@ -37,13 +38,17 @@ public class SqlGenerator { boolean isFirstField = true; for (TableTemplate.Field field : fields) { + String fieldValue = formatFieldValue(field); + if ("NULL".equals(fieldValue) && UpdateStrategy.not_null.equals(field.getUpdateStrategy())) { + continue; + } if (!isFirstField) { sqlBuilder.append(", "); } else { isFirstField = false; } sqlBuilder.append('`').append(field.getFieldName()).append('`') - .append('=').append(formatFieldValue(field)); + .append('=').append(fieldValue); } TableTemplate.Field updateTimeField = template.getUpdateTimeField(); diff --git a/source/src/main/java/work/xuye/source/handler/SourceHandler.java b/source/src/main/java/work/xuye/source/handler/SourceHandler.java index c53c17f..789ae80 100644 --- a/source/src/main/java/work/xuye/source/handler/SourceHandler.java +++ b/source/src/main/java/work/xuye/source/handler/SourceHandler.java @@ -1,8 +1,10 @@ package work.xuye.source.handler; +import com.google.gson.Gson; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; @@ -13,6 +15,7 @@ import org.springframework.util.DigestUtils; import org.springframework.util.ObjectUtils; import work.xuye.common.constant.*; import work.xuye.common.db.entity.Task; +import work.xuye.common.db.entity.vo.Dispatcher; import work.xuye.common.db.entity.vo.TaskMiniVO; import work.xuye.common.db.service.TaskManager; import work.xuye.common.dto.HttpRequestParams; @@ -20,17 +23,21 @@ import work.xuye.common.dto.HttpRes; import work.xuye.common.enums.ProcessMode; import work.xuye.common.enums.RequestMode; import work.xuye.common.enums.ResourceStatus; +import work.xuye.common.enums.SignatureEnumGroup; import work.xuye.common.service.UrlMD5Service; import work.xuye.common.utils.DebugUtil; import work.xuye.common.utils.ID; +import work.xuye.common.utils.JsonPathUtil; +import work.xuye.common.utils.signature.AbstractSignatureService; +import work.xuye.common.utils.signature.SignatureFactory; import work.xuye.source.request.RequestClient; import work.xuye.source.spel.CustomRequest; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; +import java.util.*; import java.util.regex.Matcher; +import java.util.stream.Stream; /** * @author xuye @@ -47,6 +54,7 @@ public class SourceHandler { private final UrlMD5Service urlMD5Service; private final StreamBridge streamBridge; private final TaskManager taskManager; + public final Gson gson; public List> handle(Message message) { TaskMiniVO taskMiniVO = message.getPayload(); @@ -58,18 +66,33 @@ public class SourceHandler { return null; } HttpRequestParams req = task.getRequestParams(); + if (!ObjectUtils.isEmpty(taskMiniVO.getArgs())) { + req.getArgs().putAll(taskMiniVO.getArgs()); + } + + Set otherCacheKeySet = new HashSet<>(); + Set parentOtherCacheKeySet = message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class); + if (parentOtherCacheKeySet != null) { + otherCacheKeySet.addAll(parentOtherCacheKeySet); + } + + String taskName = task.getName(); HttpRes res = null; List result = null; - if (RequestMode.normal.equals(req.getMode())) { - res = this.request(req); - result = this.handleRes(task, res); - } else if (RequestMode.SpEL.equals(req.getMode())) { + try { + if (RequestMode.normal.equals(req.getMode())) { + res = this.request(req); + result = this.handleRes(task, res); + } else if (RequestMode.SpEL.equals(req.getMode())) { - StandardEvaluationContext context = new StandardEvaluationContext(); - Expression expression = parser.parseExpression(task.getRequestParams().getSpELConfig().getExpression()); - List value = expression.getValue(context, this, List.class); - result = this.handleRes(task, value); + StandardEvaluationContext context = new StandardEvaluationContext(); + Expression expression = parser.parseExpression(task.getRequestParams().getSpELConfig().getExpression()); + List value = expression.getValue(context, this, List.class); + result = this.handleRes(task, value); + } + } catch (Exception e) { + urlMD5Service.removeUrlCache(message); } if (ObjectUtils.isEmpty(result)) { return null; @@ -81,11 +104,12 @@ public class SourceHandler { .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE_WATCH) .setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) .setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate()) - .setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) - .setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME)) + .setHeader(MessageConstants.TASK_ID, task.getId()) + .setHeader(MessageConstants.TASK_NAME, task.getName()) .setHeader(SnapshotConstants.URL, req.getUrl()) .setHeader(SnapshotConstants.RESOURCE_STATUS, r.getResourceStatus()) .setHeader(SnapshotConstants.STATUS, r.getStatus()) + .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet) .build(); boolean send = streamBridge.send(taskName + BindingConstants.SNAPSHOT_OUT_POSTFIX, watchMessage); @@ -96,6 +120,28 @@ public class SourceHandler { return null; } + if (Objects.nonNull(task.getDispatcher())) { + + Map nextNodeMessage = buildNextNodeMessage(result, task, message); + + + nextNodeMessage.forEach((k, v) -> { + if (!ObjectUtils.isEmpty(otherCacheKeySet)) { + Set set = Objects.requireNonNullElse(v.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class), new HashSet()); + set.addAll(otherCacheKeySet); + v = MessageBuilder + .fromMessage(v) + .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, set) + .build(); + } + boolean send = streamBridge.send(k, v); + if (!send) { + throw new RuntimeException("send message failed"); + } + }); + return null; + } + ArrayList> messageList = new ArrayList<>(); result.forEach( item -> { @@ -104,9 +150,10 @@ public class SourceHandler { .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE) .setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) .setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate()) - .setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) - .setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME)) + .setHeader(MessageConstants.TASK_ID, task.getId()) + .setHeader(MessageConstants.TASK_NAME, task.getName()) .setHeader(MessageConstants.SEED_URL, item.getRequestParams().getUrl()) + .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet) .build()); } ); @@ -115,6 +162,93 @@ public class SourceHandler { } + private Map buildNextNodeMessage(List result, Task task, Message message) { + Dispatcher filter = task.getDispatcher(); + + Map resultMap = new HashMap<>(); + result.stream() + .map(item -> { + HttpRes httpRes = HttpRes.build() + .status(item.getStatus()) + .costTimeMillis(item.getCostTimeMillis()) + .headers(item.getHeaders()) + .requestParams(item.getRequestParams()); + httpRes.setResourceStatus(item.getResourceStatus()); + httpRes.setTemporary(JsonPathUtil.read(item.getBody(), task.getDispatcher().getFilterArgs())); + return httpRes; + }) + .flatMap(item -> { + if (item.getTemporary() instanceof Collection) { + return ((Collection) item.getTemporary()) + .stream() + .map(temporary -> { + HttpRes params = HttpRes.build() + .status(item.getStatus()) + .costTimeMillis(item.getCostTimeMillis()) + .headers(item.getHeaders()) + .requestParams(item.getRequestParams()); + params.setResourceStatus(item.getResourceStatus()); + params.setTemporary(temporary); + return params; + }); + } else { + return Stream.of(item); + } + }) + .forEach(httpRes -> { + Object temporary = httpRes.getTemporary(); + httpRes.setTemporary(null); + Map map = (Map) temporary; + // 是否更新判断 + String uniqueKey = map.get(filter.getUniqueKey()).toString(); + uniqueKey = httpRes.getRequestParams().getUrl() + "?uniqueKey=" + uniqueKey; + String md5Digest = DigestUtils.md5DigestAsHex(gson.toJson(map).getBytes(StandardCharsets.UTF_8)); + ResourceStatus resourceStatus = urlMD5Service.getResourceStatus(uniqueKey, md5Digest); + urlMD5Service.put(uniqueKey, md5Digest); + if (ResourceStatus.UNCHANGED.equals(resourceStatus)) { + return; + } + + // 节点命中 + List nextNodes = filter.getNextNodes(); + if (ObjectUtils.isEmpty(nextNodes)) { + return; + } + StandardEvaluationContext context = new StandardEvaluationContext(this); + context.setVariable("filterArgs", map); + context.setVariable("httpRes", httpRes); + + for (Dispatcher.NextNodeConfig nextNode : nextNodes) { + + if (!Boolean.TRUE.equals(getPlaceholderSpel(nextNode.getCondition(), filter.getPlaceholderExpressions(), context, Boolean.class))) { + continue; + } + Set otherCacheKeySet = new HashSet<>(); + otherCacheKeySet.add(uniqueKey); + // 命中 + String topic = getPlaceholderSpel(nextNode.getTopic(), filter.getPlaceholderExpressions(), context, String.class); + Object payload = getPlaceholderSpel(nextNode.getPayload(), filter.getPlaceholderExpressions(), context, Object.class); + Message nextTaskMessage = MessageBuilder + .withPayload(payload) + .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE) + .setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) + .setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate()) + .setHeader(MessageConstants.TASK_ID, task.getId()) + .setHeader(MessageConstants.TASK_NAME, task.getName()) + .setHeader(MessageConstants.SEED_URL, httpRes.getRequestParams().getUrl()) + .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet) + .build(); + resultMap.put(topic, nextTaskMessage); + if ("break".equals(filter.getMode())) { + break; + } + } + } + ); + + return resultMap; + + } private void setResourceStatus(HttpRes res) { String md5Digest = DigestUtils.md5DigestAsHex(res.getBody().getBytes()); @@ -186,25 +320,87 @@ public class SourceHandler { public HttpRes request(HttpRequestParams req) { this.preHandleUrl(req); this.preHandleBody(req); + this.preHandleHeader(req); + this.preHandleSignature(req); return requestClient.execute(req); } - private void preHandleBody(HttpRequestParams request) { - Map body = request.getBody(); - if (ObjectUtils.isEmpty(body)) { + + private void preHandleSignature(HttpRequestParams requestParams) { + HttpRequestParams.RequestSignatureConfig signatureConfig = requestParams.getSignatureConfig(); + if (ObjectUtils.isEmpty(signatureConfig)) { return; } - for (String key : body.keySet()) { - Object value = body.get(key); - Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(value.toString()); + + String signatureContext = requestParams.getPlaceholderValue(signatureConfig.getSignatureContext(), String.class, parser, new StandardEvaluationContext(this)); + AbstractSignatureService signatureService = null; + try { + signatureService = SignatureFactory.getSignatureService(signatureConfig); + String signature = signatureService.signatureByPriKey(signatureContext); + // 优化 + if (SignatureEnumGroup.SignatureLocation.header.equals(signatureConfig.getSignatureLocation())) { + requestParams.getHeaders().put(signatureConfig.getSignatureField(), signature); + } + } catch (GeneralSecurityException e) { + throw new RuntimeException(e); + } + } + + private void preHandleHeader(HttpRequestParams request) { + Map headers = request.getHeaders(); + if (ObjectUtils.isEmpty(headers)) { + return; + } + request.setHeaders(calculationPlaceholderMap(request.getHeaders(), request)); + } + + + /** + * 根据占位符从spelMap中获取对应的spel表达式并计算结果 + * 或直接返回占位符(不满足占位符规则时,即无需计算) + */ + private T getPlaceholderSpel(Object placeholder, Map spelMap, EvaluationContext context, Class resultClazz) { + Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(placeholder.toString()); + if (matcher.find()) { + return parser.parseExpression(spelMap.get(matcher.group())).getValue(context, resultClazz); + } + return (T) placeholder; + } + + /** + * 计算占位符的值,并返回此map + */ + private Map calculationPlaceholderMap(Map placeholderMap, HttpRequestParams request) { + + for (String key : placeholderMap.keySet()) { + Object placeholderValue = placeholderMap.get(key); + Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(placeholderValue.toString()); if (matcher.find()) { String placeholder = matcher.group(); - String expr = request.getPlaceholderExpressions().get(placeholder); - Object result = parser.parseExpression(expr).getValue(); - body.put(key, result); + Object value = request.getPlaceholderValue(placeholder, Object.class, parser, new StandardEvaluationContext(this)); + placeholderMap.put(key, value); } } - request.setBody(body); + return placeholderMap; + } + + private void preHandleBody(HttpRequestParams request) { + Map body = request.getBody(); + if (ObjectUtils.isEmpty(body)) { + return; + } + request.setBody(calculationPlaceholderMap(request.getBody(), request)); +// for (String key : body.keySet()) { +// Object value = body.get(key); +// Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(value.toString()); +// if (matcher.find()) { +// String placeholder = matcher.group(); +// String expr = request.getPlaceholderExpressions().get(placeholder); +// Object result = parser.parseExpression(expr).getValue(); +// body.put(key, result); +// } +// } +// request.setBody(body); } private void preHandleUrl(HttpRequestParams request) { @@ -213,19 +409,23 @@ public class SourceHandler { Map patternMap = new HashMap<>(); while (matcher.find()) { String placeholder = matcher.group(); - patternMap.put(placeholder, null); + patternMap.put(placeholder, placeholder); } if (ObjectUtils.isEmpty(patternMap)) { return; } - Map placeholderExpressions = request.getPlaceholderExpressions(); - for (String key : patternMap.keySet()) { - String expr = placeholderExpressions.get(key); - Object value = parser.parseExpression(expr).getValue(); - patternMap.put(key, value); - } + + calculationPlaceholderMap(patternMap, request); +// Map placeholderExpressions = request.getPlaceholderExpressions(); + + +// for (String key : patternMap.keySet()) { +// String expr = placeholderExpressions.get(key); +// Object value = parser.parseExpression(expr).getValue(); +// patternMap.put(key, value); +// } for (String key : patternMap.keySet()) { url = url.replace(key, patternMap.get(key).toString()); } diff --git a/source/src/main/java/work/xuye/source/properties/SourceProperties.java b/source/src/main/java/work/xuye/source/properties/SourceProperties.java index 9490088..721e1e0 100644 --- a/source/src/main/java/work/xuye/source/properties/SourceProperties.java +++ b/source/src/main/java/work/xuye/source/properties/SourceProperties.java @@ -19,7 +19,9 @@ public class SourceProperties { public static final String PROPERTIES_PREFIX = "request"; private Pool pool; - private Integer timeoutSeconds = 30; + private Integer connectTimeoutSeconds = 20; + private Integer readTimeoutSeconds = 20; + @Data @NoArgsConstructor diff --git a/source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java b/source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java index 52ab1ed..810be6d 100644 --- a/source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java +++ b/source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java @@ -38,15 +38,18 @@ public class OkHttpRequestClient implements RequestClient { this.gson = gson; SourceProperties.Pool poolConfig = sourceProperties.getPool(); this.client = new OkHttpClient.Builder() - .connectTimeout(sourceProperties.getTimeoutSeconds(), TimeUnit.SECONDS) + .connectTimeout(sourceProperties.getConnectTimeoutSeconds(), TimeUnit.SECONDS) + .readTimeout(sourceProperties.getReadTimeoutSeconds(), TimeUnit.SECONDS) .connectionPool(new ConnectionPool( poolConfig.getMaxIdleConnections(), poolConfig.getKeepAliveSeconds(), TimeUnit.SECONDS)) .followRedirects(true) .build(); - log.info("init OkHttpRequestClient success, timeoutSeconds: {}, maxIdleConnections: {}, keepAliveSeconds: {}", - sourceProperties.getTimeoutSeconds(), + + log.info("init OkHttpRequestClient success, connectTimeoutSeconds: {}, readTimeoutSeconds: {}, maxIdleConnections: {}, keepAliveSeconds: {}", + sourceProperties.getConnectTimeoutSeconds(), + sourceProperties.getReadTimeoutSeconds(), poolConfig.getMaxIdleConnections(), poolConfig.getKeepAliveSeconds()); } @@ -62,7 +65,10 @@ public class OkHttpRequestClient implements RequestClient { // request header if (!ObjectUtils.isEmpty(request.getHeaders())) { - requestBuilder.headers(Headers.Companion.of(request.getHeaders())); + request.getHeaders() + .forEach((k, v) -> requestBuilder.addHeader(k, v.toString())); + +// requestBuilder.headers(Headers.Companion.of()); } // request body @@ -98,8 +104,10 @@ public class OkHttpRequestClient implements RequestClient { // res HttpRes httpRes = new HttpRes(); Request httpRequest = requestBuilder.build(); + httpRes.requestParams(request); try (Response response = client.newCall(httpRequest).execute()) { - httpRes.requestParams(request).headers(response.headers().toMultimap()).status(HttpStatus.valueOf(response.code())); + httpRes.headers(response.headers().toMultimap()) + .status(HttpStatus.valueOf(response.code())); if (!ObjectUtils.isEmpty(response.body())) { BufferedSource source = response.body().source(); Charset charset = CharsetUtil.getCharsetByName(request.getCharset()); diff --git a/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java b/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java index b299858..59ebbac 100644 --- a/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java +++ b/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java @@ -51,7 +51,7 @@ public class TransformHandler { try { return this.doHandle(message); } catch (Exception e) { - urlMD5Service.removeUrlMD5(this.getSeedUrl(message)); + urlMD5Service.removeUrlCache(message); throw new RuntimeException(e); } } @@ -158,6 +158,7 @@ public class TransformHandler { .setHeader(MessageConstants.SOURCE_TRACE_ID, message.getHeaders().get(MessageConstants.SOURCE_TRACE_ID)) .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.TRANSFORMER) .setHeader(MessageConstants.TRANSFORMER_TRACE_ID, ID.generate()) + .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET)) .build(); boolean send = streamBridge.send(BindingConstants.TRANSFORMER_KEYS_OUT, keysMsg); if (!send) { @@ -192,6 +193,7 @@ public class TransformHandler { .setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) .setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME)) .setHeader(MessageConstants.SEED_URL, message.getHeaders().get(MessageConstants.SEED_URL)) + .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET)) .build(); results.add(itemMessage); diff --git a/transformer/src/main/java/work/xuye/transformer/transformer/JsonDataTransformer.java b/transformer/src/main/java/work/xuye/transformer/transformer/JsonDataTransformer.java new file mode 100644 index 0000000..d5fb1b3 --- /dev/null +++ b/transformer/src/main/java/work/xuye/transformer/transformer/JsonDataTransformer.java @@ -0,0 +1,26 @@ +package work.xuye.transformer.transformer; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author yechuan + * @since 2023/8/11 11:55 + **/ +@Slf4j +@Component("jsonData") +public class JsonDataTransformer implements MessageTransformer{ + @Override + public String transform(String message, String seedUrl) { + JsonObject res = JsonParser.parseString(message).getAsJsonObject(); + boolean hasData = res.has("data"); + if (hasData) { + return res.get("data").toString(); + } else { + log.warn("resData transform failed, res not has data, res: {}", res); + } + return null; + } +} diff --git a/transformer/src/main/java/work/xuye/transformer/transformer/XmlToJsonTransformer.java b/transformer/src/main/java/work/xuye/transformer/transformer/XmlToJsonTransformer.java index 2200a64..eb08226 100644 --- a/transformer/src/main/java/work/xuye/transformer/transformer/XmlToJsonTransformer.java +++ b/transformer/src/main/java/work/xuye/transformer/transformer/XmlToJsonTransformer.java @@ -14,7 +14,7 @@ import org.springframework.util.ObjectUtils; @Slf4j @Component("xml2json") public class XmlToJsonTransformer implements MessageTransformer { - + @Override public String transform(String xml, String seedUrl) { if (ObjectUtils.isEmpty(xml)) { @@ -24,7 +24,7 @@ public class XmlToJsonTransformer implements MessageTransformer { try { jsonObject = XML.toJSONObject(xml); } catch (JSONException e) { - log.info("xml2json transform error, xml:{}, seedUrl:{}, exception:{}", xml, seedUrl, e); + log.warn("xml2json transform error,xml:{},seedUrl:{},error:{}", xml, seedUrl, e.getMessage()); throw new RuntimeException("xml2json transform error, seedUrl:" + seedUrl); } return jsonObject.toString();