diff --git a/common/src/main/java/work/xuye/common/alert/ExceptionAspect.java b/common/src/main/java/work/xuye/common/alert/ExceptionAspect.java index 9f81df3..30e3820 100644 --- a/common/src/main/java/work/xuye/common/alert/ExceptionAspect.java +++ b/common/src/main/java/work/xuye/common/alert/ExceptionAspect.java @@ -23,10 +23,10 @@ public class ExceptionAspect { private final MessageService messageService; - @AfterThrowing(pointcut = "execution(* work.xuye..*(..))", throwing = "ex") + @AfterThrowing(pointcut = "execution(* work.xuye..*(..))&& !execution(* work.xuye.common.service.MessageService.*(..))", throwing = "ex") public void handleGlobalException(Exception ex) { - - Sentry.capture(ex); + + List stackTraceElements = new java.util.ArrayList<>(List.of(ex.getStackTrace())); stackTraceElements.removeIf(stackTraceElement -> !stackTraceElement.getClassName().contains("work.xuye")); @@ -38,14 +38,32 @@ public class ExceptionAspect { .append(stackTraceElement.getClassName()).append(".") .append(stackTraceElement.getMethodName()).append("()").append(" (line ") .append(stackTraceElement.getLineNumber()).append(")\n")); + String traceStringString = traceString.toString(); + + if (traceStringString.contains("work.xuye.common.service.MessageService")){ + return; + } + + Sentry.capture(ex); + String subTitle = ex.getMessage(); String title = ex.getClass().getName(); if (subTitle == null) { subTitle = title; } - messageService.sendMessage(MessageType.exception, - title, - subTitle, - traceString.toString()); + + + for (int i = 1; i <= 3; i++) { + try { + messageService.sendMessage(MessageType.exception, + title, + subTitle, + traceStringString); + return; + } catch (Exception e) { + log.error("send error message error! now {} times ,try retry send again e:", i, e); + } + } + } } diff --git a/common/src/main/java/work/xuye/common/constant/BindingConstants.java b/common/src/main/java/work/xuye/common/constant/BindingConstants.java index 63d8fb6..86ffdca 100644 --- a/common/src/main/java/work/xuye/common/constant/BindingConstants.java +++ b/common/src/main/java/work/xuye/common/constant/BindingConstants.java @@ -8,6 +8,8 @@ public class BindingConstants { public static final String TASK_OUT = "task-out"; + public static final String SOURCE_OUT = "source-out"; + public static final String SNAPSHOT_OUT_POSTFIX = "-snapshot-out"; public static final String TRANSFORMER_KEYS_OUT = "transformKeys-out"; diff --git a/common/src/main/java/work/xuye/common/constant/MessageConstants.java b/common/src/main/java/work/xuye/common/constant/MessageConstants.java index 0ec13c9..da6ffb0 100644 --- a/common/src/main/java/work/xuye/common/constant/MessageConstants.java +++ b/common/src/main/java/work/xuye/common/constant/MessageConstants.java @@ -20,6 +20,7 @@ public class MessageConstants { public static final String TASK_NAME = "taskName"; public static final String SEED_URL = "seedUrl"; + public static final String OTHER_CACHE_KEY_SET = "otherCacheKeySet"; } diff --git a/common/src/main/java/work/xuye/common/db/entity/Task.java b/common/src/main/java/work/xuye/common/db/entity/Task.java index a9d37b0..9173594 100644 --- a/common/src/main/java/work/xuye/common/db/entity/Task.java +++ b/common/src/main/java/work/xuye/common/db/entity/Task.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer; import lombok.Getter; import lombok.Setter; import lombok.experimental.Accessors; +import work.xuye.common.db.entity.vo.NextTaskFilter; import work.xuye.common.db.entity.vo.ParseParams; import work.xuye.common.db.entity.vo.SinkParams; import work.xuye.common.db.entity.vo.TransformParams; @@ -58,6 +59,9 @@ public class Task implements Serializable { @TableField(value = "sink_params", typeHandler = JacksonTypeHandler.class) private SinkParams sinkParams; + @TableField(value = "next_filter", typeHandler = JacksonTypeHandler.class) + private NextTaskFilter nextFilter; + @TableField(value = "create_time", fill = FieldFill.INSERT) @JsonDeserialize(using = LocalDateTimeDeserializer.class) @JsonSerialize(using = LocalDateTimeSerializer.class) @@ -68,6 +72,9 @@ public class Task implements Serializable { @JsonSerialize(using = LocalDateTimeSerializer.class) private LocalDateTime updateTime; + @TableField(value = "is_root") + private Boolean isRoot; + @TableField("version") @Version private Integer version; diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/NextTaskFilter.java b/common/src/main/java/work/xuye/common/db/entity/vo/NextTaskFilter.java new file mode 100644 index 0000000..f7da759 --- /dev/null +++ b/common/src/main/java/work/xuye/common/db/entity/vo/NextTaskFilter.java @@ -0,0 +1,32 @@ +package work.xuye.common.db.entity.vo; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; +import java.util.Map; + +/** + * @author yechuan + * @since 2023/8/8 19:09 + **/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class NextTaskFilter { + private String filterArgs; + private String mode; + private String uniqueKey; + private List nextNodes; + private Map placeholderExpressions = Map.of(); + + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class NextNodeConfig { + private String topic; + private String condition; + private String payload; + } +} diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java b/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java index 86482d3..5b213b8 100644 --- a/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java +++ b/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java @@ -50,6 +50,7 @@ public class SinkParams { private Status status; private String fieldName; private String jsonPath; + private String dbObjectExpression; } @Data diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java b/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java index 2601fe1..b6d8141 100644 --- a/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java +++ b/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java @@ -5,6 +5,7 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import work.xuye.common.enums.DbFieldType; +import work.xuye.common.enums.UpdateStrategy; import java.util.List; @@ -80,6 +81,11 @@ public class TableTemplate { * 字段校验表达式规则 */ private List rules; + + /** + * 修改时的策略 + */ + private UpdateStrategy updateStrategy = UpdateStrategy.ignored; } @Data diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java b/common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java index 50e2b26..b9952d3 100644 --- a/common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java +++ b/common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java @@ -5,6 +5,8 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import java.util.Map; + /** * @author xuye * @since 2023/7/7 21:58 @@ -18,4 +20,6 @@ public class TaskMiniVO { private Integer id; private String name; + + private Map args; } diff --git a/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java b/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java index bfc66ff..67f65e4 100644 --- a/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java +++ b/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java @@ -1,13 +1,22 @@ package work.xuye.common.dto; +import cn.hutool.core.collection.CollUtil; +import com.baomidou.mybatisplus.annotation.TableField; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; +import org.springframework.expression.EvaluationContext; +import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import work.xuye.common.enums.RequestMode; +import work.xuye.common.enums.SignatureEnumGroup; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.Set; /** * @author xuye @@ -24,10 +33,63 @@ public class HttpRequestParams { private String url; private Map body = Map.of(); private String charset; - private Map headers = Map.of(); + private Map headers = Map.of(); private Map placeholderExpressions = Map.of(); private String mediaType = MediaType.APPLICATION_JSON_VALUE; + private RequestSignatureConfig signature; + + /** + * 仅计算一次的表达式(同次请求后续使用上次计算值) + */ + private Set onceCompileExpressions = Collections.emptySet(); + + /** + * 缓存的表达式值 + */ + @TableField(exist = false) + private Map placeholderExpressionsValueCache = new HashMap<>(); + + + /** + * 参数(由父任务传递,参与计算与组装) + */ + @TableField(exist = false) + private Map args = new HashMap<>(); + + + /** + * 根据占位符获取值 + */ + public T getPlaceholderValue(String placeholder, Class t, SpelExpressionParser parser, EvaluationContext context) { + if (CollUtil.isEmpty(placeholderExpressions)) { + return null; + } + if (!placeholderExpressions.containsKey(placeholder)) { + return null; + } + boolean cache = false; + + if (onceCompileExpressions.contains(placeholder)) { + // 一次请求中 仅计算一次 + if (placeholderExpressionsValueCache.containsKey(placeholder)) { + return t.cast(placeholderExpressionsValueCache.get(placeholder)); + } + cache = true; + } + context.setVariable("requestParams", this); + String expr = placeholderExpressions.get(placeholder); + T result = parser.parseExpression(expr).getValue(context, t); + + if (cache) { + placeholderExpressionsValueCache.put(placeholder, result); + } + + return result; + + + } + @Data @NoArgsConstructor @AllArgsConstructor @@ -36,4 +98,13 @@ public class HttpRequestParams { } + @Data + @NoArgsConstructor + @AllArgsConstructor + @EqualsAndHashCode(callSuper = true) + public static class RequestSignatureConfig extends SignatureConfig { + private String signatureFile; + private String signatureContext; + private SignatureEnumGroup.SignatureLocation signatureLocation; + } } diff --git a/common/src/main/java/work/xuye/common/dto/HttpRes.java b/common/src/main/java/work/xuye/common/dto/HttpRes.java index e2f8c1d..16f6785 100644 --- a/common/src/main/java/work/xuye/common/dto/HttpRes.java +++ b/common/src/main/java/work/xuye/common/dto/HttpRes.java @@ -1,5 +1,6 @@ package work.xuye.common.dto; +import com.fasterxml.jackson.annotation.JsonInclude; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -25,6 +26,12 @@ public class HttpRes { private Map> headers = Map.of(); private Long costTimeMillis; + /** + * 零时变量 + */ + @JsonInclude + private Object temporary; + public static HttpRes build() { return new HttpRes(); } diff --git a/common/src/main/java/work/xuye/common/dto/SignatureConfig.java b/common/src/main/java/work/xuye/common/dto/SignatureConfig.java new file mode 100644 index 0000000..a60081e --- /dev/null +++ b/common/src/main/java/work/xuye/common/dto/SignatureConfig.java @@ -0,0 +1,36 @@ +package work.xuye.common.dto; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * 加解密配置 + * + * @author yechuan + * @since 2023/8/8 14:44 + **/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class SignatureConfig { + /** + * key 算法 + */ + private String keyAlgorithm; + + /** + * 公私钥类型 + */ + private String keyType; + + /** + * 加密算法 + */ + private String signatureAlgorithm; + + /** + * key + */ + private String key; +} diff --git a/common/src/main/java/work/xuye/common/dto/SignatureContext.java b/common/src/main/java/work/xuye/common/dto/SignatureContext.java new file mode 100644 index 0000000..8b917a2 --- /dev/null +++ b/common/src/main/java/work/xuye/common/dto/SignatureContext.java @@ -0,0 +1,18 @@ +package work.xuye.common.dto; + +import lombok.Data; + +import java.security.PrivateKey; +import java.security.PublicKey; +import java.security.Signature; + +/** + * @author yechuan + * @since 2023/8/9 17:53 + **/ +@Data +public class SignatureContext { + private PrivateKey priKey; + private PublicKey pubKey; + private Signature signature; +} diff --git a/common/src/main/java/work/xuye/common/enums/SignatureEnumGroup.java b/common/src/main/java/work/xuye/common/enums/SignatureEnumGroup.java new file mode 100644 index 0000000..8de5f94 --- /dev/null +++ b/common/src/main/java/work/xuye/common/enums/SignatureEnumGroup.java @@ -0,0 +1,36 @@ +package work.xuye.common.enums; + +/** + * @author yechuan + * @since 2023/8/8 14:52 + **/ +public class SignatureEnumGroup { + + /** + * 加密字符串的位置 + */ + public static enum SignatureLocation{ + header + } + + public static enum KeyAlgorithm{ + /** + * RSA + */ + RSA + } + + public static enum KeyType{ + /** + * 私钥 + */ + privateKey, + /** + * 公钥 + */ + publicKey, + } + + + +} diff --git a/common/src/main/java/work/xuye/common/enums/UpdateStrategy.java b/common/src/main/java/work/xuye/common/enums/UpdateStrategy.java new file mode 100644 index 0000000..e10d026 --- /dev/null +++ b/common/src/main/java/work/xuye/common/enums/UpdateStrategy.java @@ -0,0 +1,18 @@ +package work.xuye.common.enums; + +/** + * 修改时的策略 + * + * @author yechuan + * @since 2023/8/10 13:37 + **/ +public enum UpdateStrategy { + /** + * 忽略判断 + */ + ignored, + /** + * 非NULL时加入sql + */ + not_null +} diff --git a/common/src/main/java/work/xuye/common/service/UrlMD5Service.java b/common/src/main/java/work/xuye/common/service/UrlMD5Service.java index b82a98c..388c380 100644 --- a/common/src/main/java/work/xuye/common/service/UrlMD5Service.java +++ b/common/src/main/java/work/xuye/common/service/UrlMD5Service.java @@ -2,10 +2,16 @@ package work.xuye.common.service; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.messaging.Message; import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; +import org.springframework.util.StringUtils; +import work.xuye.common.constant.MessageConstants; import work.xuye.common.enums.ResourceStatus; import work.xuye.common.store.UrlMD5MapStore; +import java.util.Set; + /** * @author xuye * @since 2023/3/6 21:58 @@ -37,4 +43,15 @@ public class UrlMD5Service { Long aLong = urlMD5MapStore.removeKey(seedUrl); log.info("删除url对应的md5,url:{}, 删除条数:{}", seedUrl, aLong); } + + public void removeUrlCache(Message message) { + Set parentOtherCacheKeySet = message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class); + if (!ObjectUtils.isEmpty(parentOtherCacheKeySet)) { + parentOtherCacheKeySet.forEach(this::removeUrlMD5); + } + String sendUrl = message.getHeaders().get(MessageConstants.SEED_URL, String.class); + if (!StringUtils.hasText(sendUrl)) { + this.removeUrlMD5(sendUrl); + } + } } diff --git a/common/src/main/java/work/xuye/common/spel/CustomFunction.java b/common/src/main/java/work/xuye/common/spel/CustomFunction.java index 43e09f0..5653fee 100644 --- a/common/src/main/java/work/xuye/common/spel/CustomFunction.java +++ b/common/src/main/java/work/xuye/common/spel/CustomFunction.java @@ -4,6 +4,8 @@ import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.PathNotFoundException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.expression.Expression; @@ -41,6 +43,13 @@ public class CustomFunction { private final NsKVMapStore mapStore; + public String localDateTimeFormat(LocalDateTime time, String pattern) { + if (time == null) { + return ""; + } + return time.format(DateTimeFormatter.ofPattern(pattern)); + } + public String jsonExtract(String json, String path) { String result = null; try { @@ -53,6 +62,20 @@ public class CustomFunction { return result; } + public String jsonExtractOrPathNotFound(String json, String path) { + String result = null; + try { + result = JsonPath.read(json, path).toString().trim(); + } catch (PathNotFoundException e) { + log.warn("jsonPath not find, json: [{}], path: [{}]", json, path); + } catch (Exception e) { + log.error("jsonExtra handler error, json: [{}], path: [{}]", json, path); + throw new RuntimeException(e); + } + log.debug("jsonExtra handler, json: [{}], path: [{}], result: [{}]", json, path, result); + return result; + } + public String jsonExtractReplaceAll(String json, String jsonPath, String regex, String replacement) { return this.jsonExtract(json, jsonPath).replaceAll(regex, replacement).trim(); } diff --git a/common/src/main/java/work/xuye/common/utils/SignatureUtil.java b/common/src/main/java/work/xuye/common/utils/SignatureUtil.java new file mode 100644 index 0000000..751b183 --- /dev/null +++ b/common/src/main/java/work/xuye/common/utils/SignatureUtil.java @@ -0,0 +1,59 @@ +package work.xuye.common.utils; + +import org.apache.commons.codec.binary.Base64; +import org.springframework.stereotype.Component; +import org.springframework.util.DigestUtils; +import work.xuye.common.dto.HttpRequestParams; + +import java.nio.charset.StandardCharsets; +import java.security.KeyFactory; +import java.security.PrivateKey; +import java.security.Signature; +import java.security.spec.PKCS8EncodedKeySpec; + +/** + * @author yechuan + * @since 2023/8/7 15:08 + **/ +@Component +public class SignatureUtil { + + private final static String privateKey = "MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBANTyWsgvVOW6lNH4\n" + + "koggbORNIdM0mw+klkTfAZJ8hM9SvxaPWgo55dU1zp46nUs15ZStf/A4EmOeh4jA\n" + + "SLqHf3Zd2WJc5izjEm70pnwHXRCjOA8i4nR6ia0fpzZPf+FNqElmvsfrvqk185cf\n" + + "kAXk2RoXSAykUi+2UMK6TlTaHmqXAgMBAAECgYAmCFcQc+us0CMuUUASkgAA0ond\n" + + "CAM9yv6PtGi6egTaZoP8ioPhWa/j4aVSe1OGkEy9vjMge1NFeZXpZbZXokWwUdmi\n" + + "xxOcCYgYKebJ0Fmssvj/GSRL93B1JlXs88MdedAGlef1b1IyURUSDbkkbOZmHnON\n" + + "OgkoSafcB+JFx6Ea8QJBAPTaiAZSsco9e23EMXfhy+h7CjYYY8LJ+gBrnEZV2jns\n" + + "O7i2W/2c9sEzexG8C/W1oEbEDsfkmCKvf/mQ+dJb0d8CQQDeo/yuk5Z0W+yBos5U\n" + + "h0vjlAJzVL812QNv02bogmGaS7y3Ao1+/cVeTWSrlEjjM2tcbnpFtOgFtLBl1lFX\n" + + "9q5JAkB6gskimMe6UC7sygiSWhjjdoSycluf/90lzrH/gz9QUgHDtwKqD5prKq3+\n" + + "Pp+hTkImhjx7CcaRPEyE+2P0O9rzAkEAq2XJgEhkmn2uDHrepxplZPUsEcebUIQZ\n" + + "7jvsTHEbXKKTzLwtXCdXi2q/ZovItQh/zW/Lt+A2gzYAWtXsV3Cz6QJBAJUTb8kM\n" + + "5iUYlWupmS6kOSfYFOqrVvI3w9kZUocsxv338dpzuKxikkIxAfiBTtbVXawEkF48\n" + + "tTSQsDJmnXj2Hhw="; + + + + public String signature(String content) throws Exception { + + byte[] decode = Base64.decodeBase64(privateKey); + PKCS8EncodedKeySpec priPKCS8 = new PKCS8EncodedKeySpec(decode); + PrivateKey priKey = KeyFactory.getInstance("RSA").generatePrivate(priPKCS8); + String md5 = DigestUtils.md5DigestAsHex(content.getBytes(StandardCharsets.UTF_8)).toUpperCase(); + Signature signature = Signature.getInstance("SHA1WithRSA"); + signature.initSign(priKey); + signature.update(md5.getBytes(StandardCharsets.UTF_8)); + return Base64.encodeBase64String(signature.sign()).trim(); + } + + public String packageContent(HttpRequestParams params, String timestamp, String body) { + return "ENCRYPT-ALGORITHM" + "=" + "RSA" + + "&" + + "PARAMS" + "=" + body + + "&" + + "TIMESTAMP" + "=" + timestamp; + + } + +} diff --git a/common/src/main/java/work/xuye/common/utils/signature/AbstractSignatureService.java b/common/src/main/java/work/xuye/common/utils/signature/AbstractSignatureService.java new file mode 100644 index 0000000..1b2101d --- /dev/null +++ b/common/src/main/java/work/xuye/common/utils/signature/AbstractSignatureService.java @@ -0,0 +1,32 @@ +package work.xuye.common.utils.signature; + +import work.xuye.common.dto.SignatureConfig; +import work.xuye.common.dto.SignatureContext; + +import java.security.GeneralSecurityException; +import java.security.Signature; + +/** + * @author yechuan + * @since 2023/8/8 14:42 + **/ +public abstract class AbstractSignatureService { + + protected SignatureContext signatureContext; + + /** + * 使用私钥加密 + * + * @param context + * @return + * @throws GeneralSecurityException + */ + public abstract String signatureByPriKey(String context) throws GeneralSecurityException; + + + void init(SignatureConfig signatureConfig) throws GeneralSecurityException { + signatureContext = new SignatureContext(); + signatureContext.setSignature(Signature.getInstance(signatureConfig.getSignatureAlgorithm())); + } + +} diff --git a/common/src/main/java/work/xuye/common/utils/signature/RSASignatureService.java b/common/src/main/java/work/xuye/common/utils/signature/RSASignatureService.java new file mode 100644 index 0000000..54df1e5 --- /dev/null +++ b/common/src/main/java/work/xuye/common/utils/signature/RSASignatureService.java @@ -0,0 +1,58 @@ +package work.xuye.common.utils.signature; + +import org.apache.commons.codec.binary.Base64; +import org.springframework.util.DigestUtils; +import work.xuye.common.dto.SignatureConfig; +import work.xuye.common.enums.SignatureEnumGroup; + +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; +import java.security.KeyFactory; +import java.security.PrivateKey; +import java.security.Signature; +import java.security.spec.PKCS8EncodedKeySpec; +import java.security.spec.X509EncodedKeySpec; + +/** + * RSA + * + * @author yechuan + * @since 2023/8/8 14:47 + **/ + +class RSASignatureService extends AbstractSignatureService { + + @Override + public String signatureByPriKey(String context) throws GeneralSecurityException { + if (signatureContext == null) { + throw new RuntimeException("未初始化公私钥信息"); + } + Signature signature = signatureContext.getSignature(); + PrivateKey priKey = signatureContext.getPriKey(); + signature.initSign(priKey); + context = DigestUtils.md5DigestAsHex(context.getBytes(StandardCharsets.UTF_8)).toUpperCase(); + signature.update(context.getBytes(StandardCharsets.UTF_8)); + return Base64.encodeBase64String(signature.sign()); + } + + @Override + void init(SignatureConfig signatureConfig) throws GeneralSecurityException { + super.init(signatureConfig); + + String keyTypeName = signatureConfig.getKeyType(); + SignatureEnumGroup.KeyType keyType = SignatureEnumGroup.KeyType.valueOf(keyTypeName); + + KeyFactory keyFactory = KeyFactory.getInstance(SignatureEnumGroup.KeyAlgorithm.RSA.name()); + + byte[] encodedKey = Base64.decodeBase64(signatureConfig.getKey()); + + if (SignatureEnumGroup.KeyType.privateKey.equals(keyType)) { + signatureContext.setPriKey(keyFactory.generatePrivate(new PKCS8EncodedKeySpec(encodedKey))); + } else if (SignatureEnumGroup.KeyType.publicKey.equals(keyType)) { + signatureContext.setPubKey(keyFactory.generatePublic(new X509EncodedKeySpec(encodedKey))); + } + + } + + +} diff --git a/common/src/main/java/work/xuye/common/utils/signature/SignatureFactory.java b/common/src/main/java/work/xuye/common/utils/signature/SignatureFactory.java new file mode 100644 index 0000000..9c8100b --- /dev/null +++ b/common/src/main/java/work/xuye/common/utils/signature/SignatureFactory.java @@ -0,0 +1,34 @@ +package work.xuye.common.utils.signature; + +import work.xuye.common.dto.SignatureConfig; +import work.xuye.common.enums.SignatureEnumGroup; + +import java.security.GeneralSecurityException; + +/** + * @author yechuan + * @since 2023/8/8 15:20 + **/ +public class SignatureFactory { + + /** + * 获取加解密实现 + */ + public static AbstractSignatureService getSignatureService(SignatureConfig signatureConfig) throws GeneralSecurityException { + + AbstractSignatureService result = null; + + switch (SignatureEnumGroup.KeyAlgorithm.valueOf(signatureConfig.getKeyAlgorithm())) { + case RSA: + result = new RSASignatureService(); + break; + } + if (result == null) { + throw new RuntimeException(String.format("不支持的签名方式,signatureConfig:%s", signatureConfig)); + } + result.init(signatureConfig); + return result; + + } + +} diff --git a/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java b/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java index 04e6c4b..0b66cdd 100644 --- a/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java +++ b/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java @@ -1,5 +1,6 @@ package work.xuye.scheduler.service; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; @@ -73,7 +74,8 @@ public class IssueService implements ApplicationRunner { public ArrayList issueAllTask() { log.info("--------------------"); - List list = taskService.list(); +// List list = taskService.list(); + List list = taskService.list(new LambdaQueryWrapper().eq(Task::getIsRoot,true)); Collections.shuffle(list); ArrayList result = new ArrayList<>(); list.forEach(task -> { diff --git a/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java b/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java index d339a5d..d74a0b1 100644 --- a/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java +++ b/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java @@ -22,10 +22,7 @@ import work.xuye.common.spel.SpringExpressionLanguageEvaluator; import work.xuye.common.utils.DebugUtil; import work.xuye.sink.service.JdbcService; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** * @author xuye @@ -64,8 +61,7 @@ public class ItemHandler { } private void cleanCache(Message message) { - String seedUrl = (String) message.getHeaders().get(MessageConstants.SEED_URL); - urlMD5Service.removeUrlMD5(seedUrl); + urlMD5Service.removeUrlCache(message); } @@ -319,8 +315,10 @@ public class ItemHandler { if (!ObjectUtils.isEmpty(checkUpdateConfig.getJsonPath())) { dbValue = cf.jsonExtract(dbValue.toString(), checkUpdateConfig.getJsonPath()); nowValue = cf.jsonExtract(nowValue.toString(), checkUpdateConfig.getJsonPath()); + } else if (!ObjectUtils.isEmpty(checkUpdateConfig.getDbObjectExpression())) { + dbValue = evaluator.evaluate(dbValue, checkUpdateConfig.getDbObjectExpression(), String.class); } else { - throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是没有配置jsonPath"); + throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是没有配置jsonPath与dbObjectExpression"); } if (dbValue.equals(nowValue)) { log.info("[{}][✅ 已是最新][{} {}] " + "{}: {}", diff --git a/sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java b/sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java index b801857..1cfac60 100644 --- a/sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java +++ b/sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java @@ -2,6 +2,7 @@ package work.xuye.sink.handler; import work.xuye.common.db.entity.vo.TableTemplate; import work.xuye.common.enums.DbFieldType; +import work.xuye.common.enums.UpdateStrategy; import java.util.ArrayList; import java.util.List; @@ -37,13 +38,17 @@ public class SqlGenerator { boolean isFirstField = true; for (TableTemplate.Field field : fields) { + String fieldValue = formatFieldValue(field); + if ("NULL".equals(fieldValue) && UpdateStrategy.not_null.equals(field.getUpdateStrategy())) { + continue; + } if (!isFirstField) { sqlBuilder.append(", "); } else { isFirstField = false; } sqlBuilder.append('`').append(field.getFieldName()).append('`') - .append('=').append(formatFieldValue(field)); + .append('=').append(fieldValue); } TableTemplate.Field updateTimeField = template.getUpdateTimeField(); diff --git a/source/src/main/java/work/xuye/source/handler/SourceHandler.java b/source/src/main/java/work/xuye/source/handler/SourceHandler.java index c53c17f..66ca1a6 100644 --- a/source/src/main/java/work/xuye/source/handler/SourceHandler.java +++ b/source/src/main/java/work/xuye/source/handler/SourceHandler.java @@ -1,8 +1,10 @@ package work.xuye.source.handler; +import com.google.gson.Gson; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.expression.EvaluationContext; import org.springframework.expression.Expression; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.expression.spel.support.StandardEvaluationContext; @@ -13,6 +15,7 @@ import org.springframework.util.DigestUtils; import org.springframework.util.ObjectUtils; import work.xuye.common.constant.*; import work.xuye.common.db.entity.Task; +import work.xuye.common.db.entity.vo.NextTaskFilter; import work.xuye.common.db.entity.vo.TaskMiniVO; import work.xuye.common.db.service.TaskManager; import work.xuye.common.dto.HttpRequestParams; @@ -20,17 +23,21 @@ import work.xuye.common.dto.HttpRes; import work.xuye.common.enums.ProcessMode; import work.xuye.common.enums.RequestMode; import work.xuye.common.enums.ResourceStatus; +import work.xuye.common.enums.SignatureEnumGroup; import work.xuye.common.service.UrlMD5Service; import work.xuye.common.utils.DebugUtil; import work.xuye.common.utils.ID; +import work.xuye.common.utils.JsonPathUtil; +import work.xuye.common.utils.signature.AbstractSignatureService; +import work.xuye.common.utils.signature.SignatureFactory; import work.xuye.source.request.RequestClient; import work.xuye.source.spel.CustomRequest; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; +import java.util.*; import java.util.regex.Matcher; +import java.util.stream.Stream; /** * @author xuye @@ -47,6 +54,7 @@ public class SourceHandler { private final UrlMD5Service urlMD5Service; private final StreamBridge streamBridge; private final TaskManager taskManager; + public final Gson gson; public List> handle(Message message) { TaskMiniVO taskMiniVO = message.getPayload(); @@ -58,18 +66,33 @@ public class SourceHandler { return null; } HttpRequestParams req = task.getRequestParams(); + if (!ObjectUtils.isEmpty(taskMiniVO.getArgs())) { + req.getArgs().putAll(taskMiniVO.getArgs()); + } + + Set otherCacheKeySet = new HashSet<>(); + Set parentOtherCacheKeySet = message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class); + if (parentOtherCacheKeySet != null) { + otherCacheKeySet.addAll(parentOtherCacheKeySet); + } + + String taskName = task.getName(); HttpRes res = null; List result = null; - if (RequestMode.normal.equals(req.getMode())) { - res = this.request(req); - result = this.handleRes(task, res); - } else if (RequestMode.SpEL.equals(req.getMode())) { + try { + if (RequestMode.normal.equals(req.getMode())) { + res = this.request(req); + result = this.handleRes(task, res); + } else if (RequestMode.SpEL.equals(req.getMode())) { - StandardEvaluationContext context = new StandardEvaluationContext(); - Expression expression = parser.parseExpression(task.getRequestParams().getSpELConfig().getExpression()); - List value = expression.getValue(context, this, List.class); - result = this.handleRes(task, value); + StandardEvaluationContext context = new StandardEvaluationContext(); + Expression expression = parser.parseExpression(task.getRequestParams().getSpELConfig().getExpression()); + List value = expression.getValue(context, this, List.class); + result = this.handleRes(task, value); + } + } catch (Exception e) { + urlMD5Service.removeUrlCache(message); } if (ObjectUtils.isEmpty(result)) { return null; @@ -81,11 +104,12 @@ public class SourceHandler { .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE_WATCH) .setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) .setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate()) - .setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) - .setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME)) + .setHeader(MessageConstants.TASK_ID, task.getId()) + .setHeader(MessageConstants.TASK_NAME, task.getName()) .setHeader(SnapshotConstants.URL, req.getUrl()) .setHeader(SnapshotConstants.RESOURCE_STATUS, r.getResourceStatus()) .setHeader(SnapshotConstants.STATUS, r.getStatus()) + .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet) .build(); boolean send = streamBridge.send(taskName + BindingConstants.SNAPSHOT_OUT_POSTFIX, watchMessage); @@ -96,6 +120,26 @@ public class SourceHandler { return null; } + if (Objects.nonNull(task.getNextFilter())) { + + Map nextNodeMessage = buildNextNodeMessage(result, task, message); + + if (!ObjectUtils.isEmpty(nextNodeMessage)) { + nextNodeMessage.forEach((k, v) -> { + Set set = v.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class); + if (!ObjectUtils.isEmpty(set) && !ObjectUtils.isEmpty(otherCacheKeySet)) { + set.addAll(otherCacheKeySet); + v.getHeaders().put(MessageConstants.OTHER_CACHE_KEY_SET, set); + } + boolean send = streamBridge.send(k, v); + if (!send) { + throw new RuntimeException("send message failed"); + } + }); + } + return null; + } + ArrayList> messageList = new ArrayList<>(); result.forEach( item -> { @@ -104,9 +148,10 @@ public class SourceHandler { .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE) .setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) .setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate()) - .setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) - .setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME)) + .setHeader(MessageConstants.TASK_ID, task.getId()) + .setHeader(MessageConstants.TASK_NAME, task.getName()) .setHeader(MessageConstants.SEED_URL, item.getRequestParams().getUrl()) + .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet) .build()); } ); @@ -115,6 +160,93 @@ public class SourceHandler { } + private Map buildNextNodeMessage(List result, Task task, Message message) { + NextTaskFilter filter = task.getNextFilter(); + Set otherCacheKeySet = new HashSet<>(); + Map resultMap = new HashMap<>(); + result.stream() + .map(item -> { + HttpRes httpRes = HttpRes.build() + .status(item.getStatus()) + .costTimeMillis(item.getCostTimeMillis()) + .headers(item.getHeaders()) + .requestParams(item.getRequestParams()); + httpRes.setResourceStatus(item.getResourceStatus()); + httpRes.setTemporary(JsonPathUtil.read(item.getBody(), task.getNextFilter().getFilterArgs())); + return httpRes; + }) + .flatMap(item -> { + if (item.getTemporary() instanceof Collection) { + return ((Collection) item.getTemporary()) + .stream() + .map(temporary -> { + HttpRes params = HttpRes.build() + .status(item.getStatus()) + .costTimeMillis(item.getCostTimeMillis()) + .headers(item.getHeaders()) + .requestParams(item.getRequestParams()); + params.setResourceStatus(item.getResourceStatus()); + params.setTemporary(temporary); + return params; + }); + } else { + return Stream.of(item); + } + }) + .forEach(httpRes -> { + Object temporary = httpRes.getTemporary(); + httpRes.setTemporary(null); + Map map = (Map) temporary; + // 是否更新判断 + String uniqueKey = map.get(filter.getUniqueKey()).toString(); + uniqueKey = httpRes.getRequestParams().getUrl() + "?uniqueKey=" + uniqueKey; + String md5Digest = DigestUtils.md5DigestAsHex(gson.toJson(map).getBytes(StandardCharsets.UTF_8)); + ResourceStatus resourceStatus = urlMD5Service.getResourceStatus(uniqueKey, md5Digest); + urlMD5Service.put(uniqueKey, md5Digest); + if (ResourceStatus.UNCHANGED.equals(resourceStatus)) { + return; + } + + // 节点命中 + List nextNodes = filter.getNextNodes(); + if (ObjectUtils.isEmpty(nextNodes)) { + return; + } + StandardEvaluationContext context = new StandardEvaluationContext(this); + context.setVariable("filterArgs", map); + context.setVariable("httpRes", httpRes); + + for (NextTaskFilter.NextNodeConfig nextNode : nextNodes) { + + if (!Boolean.TRUE.equals(getPlaceholderSpel(nextNode.getCondition(), filter.getPlaceholderExpressions(), context, Boolean.class))) { + continue; + } + otherCacheKeySet.add(uniqueKey); + // 命中 + String topic = getPlaceholderSpel(nextNode.getTopic(), filter.getPlaceholderExpressions(), context, String.class); + Object payload = getPlaceholderSpel(nextNode.getPayload(), filter.getPlaceholderExpressions(), context, Object.class); + Message nextTaskMessage = MessageBuilder + .withPayload(payload) + .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE) + .setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) + .setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate()) + .setHeader(MessageConstants.TASK_ID, task.getId()) + .setHeader(MessageConstants.TASK_NAME, task.getName()) + .setHeader(MessageConstants.SEED_URL, httpRes.getRequestParams().getUrl()) + .setHeader(MessageConstants.SEED_URL, httpRes.getRequestParams().getUrl()) + .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet) + .build(); + resultMap.put(topic, nextTaskMessage); + if ("break".equals(filter.getMode())) { + break; + } + } + } + ); + + return resultMap; + + } private void setResourceStatus(HttpRes res) { String md5Digest = DigestUtils.md5DigestAsHex(res.getBody().getBytes()); @@ -186,25 +318,87 @@ public class SourceHandler { public HttpRes request(HttpRequestParams req) { this.preHandleUrl(req); this.preHandleBody(req); + this.preHandleHeader(req); + this.preHandleSignature(req); return requestClient.execute(req); } - private void preHandleBody(HttpRequestParams request) { - Map body = request.getBody(); - if (ObjectUtils.isEmpty(body)) { + + private void preHandleSignature(HttpRequestParams requestParams) { + HttpRequestParams.RequestSignatureConfig signatureConfig = requestParams.getSignature(); + if (ObjectUtils.isEmpty(signatureConfig)) { + return; + } + + String signatureContext = requestParams.getPlaceholderValue(signatureConfig.getSignatureContext(), String.class, parser, new StandardEvaluationContext(this)); + AbstractSignatureService signatureService = null; + try { + signatureService = SignatureFactory.getSignatureService(signatureConfig); + String signature = signatureService.signatureByPriKey(signatureContext); + // 优化 + if (SignatureEnumGroup.SignatureLocation.header.equals(signatureConfig.getSignatureLocation())) { + requestParams.getHeaders().put(signatureConfig.getSignatureFile(), signature); + } + } catch (GeneralSecurityException e) { + throw new RuntimeException(e); + } + } + + private void preHandleHeader(HttpRequestParams request) { + Map headers = request.getHeaders(); + if (ObjectUtils.isEmpty(headers)) { return; } - for (String key : body.keySet()) { - Object value = body.get(key); - Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(value.toString()); + request.setHeaders(aclculationPlaceholderMap(request.getHeaders(), request)); + } + + + /** + * 根据占位符从spelMap中获取对应的spel表达式并计算结果 + * 或直接返回占位符(不满足占位符规则时,即无需计算) + */ + private T getPlaceholderSpel(Object placeholder, Map spelMap, EvaluationContext context, Class resultClazz) { + Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(placeholder.toString()); + if (matcher.find()) { + return parser.parseExpression(spelMap.get(matcher.group())).getValue(context, resultClazz); + } + return (T) placeholder; + } + + /** + * 计算占位符的值,并返回此map + */ + private Map aclculationPlaceholderMap(Map placeholderMap, HttpRequestParams request) { + + for (String key : placeholderMap.keySet()) { + Object placeholderValue = placeholderMap.get(key); + Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(placeholderValue.toString()); if (matcher.find()) { String placeholder = matcher.group(); - String expr = request.getPlaceholderExpressions().get(placeholder); - Object result = parser.parseExpression(expr).getValue(); - body.put(key, result); + Object value = request.getPlaceholderValue(placeholder, Object.class, parser, new StandardEvaluationContext(this)); + placeholderMap.put(key, value); } } - request.setBody(body); + return placeholderMap; + } + + private void preHandleBody(HttpRequestParams request) { + Map body = request.getBody(); + if (ObjectUtils.isEmpty(body)) { + return; + } + request.setBody(aclculationPlaceholderMap(request.getBody(), request)); +// for (String key : body.keySet()) { +// Object value = body.get(key); +// Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(value.toString()); +// if (matcher.find()) { +// String placeholder = matcher.group(); +// String expr = request.getPlaceholderExpressions().get(placeholder); +// Object result = parser.parseExpression(expr).getValue(); +// body.put(key, result); +// } +// } +// request.setBody(body); } private void preHandleUrl(HttpRequestParams request) { @@ -213,19 +407,23 @@ public class SourceHandler { Map patternMap = new HashMap<>(); while (matcher.find()) { String placeholder = matcher.group(); - patternMap.put(placeholder, null); + patternMap.put(placeholder, placeholder); } if (ObjectUtils.isEmpty(patternMap)) { return; } - Map placeholderExpressions = request.getPlaceholderExpressions(); - for (String key : patternMap.keySet()) { - String expr = placeholderExpressions.get(key); - Object value = parser.parseExpression(expr).getValue(); - patternMap.put(key, value); - } + + aclculationPlaceholderMap(patternMap, request); +// Map placeholderExpressions = request.getPlaceholderExpressions(); + + +// for (String key : patternMap.keySet()) { +// String expr = placeholderExpressions.get(key); +// Object value = parser.parseExpression(expr).getValue(); +// patternMap.put(key, value); +// } for (String key : patternMap.keySet()) { url = url.replace(key, patternMap.get(key).toString()); } diff --git a/source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java b/source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java index 9e5c3f2..810be6d 100644 --- a/source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java +++ b/source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java @@ -65,7 +65,10 @@ public class OkHttpRequestClient implements RequestClient { // request header if (!ObjectUtils.isEmpty(request.getHeaders())) { - requestBuilder.headers(Headers.Companion.of(request.getHeaders())); + request.getHeaders() + .forEach((k, v) -> requestBuilder.addHeader(k, v.toString())); + +// requestBuilder.headers(Headers.Companion.of()); } // request body diff --git a/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java b/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java index b299858..59ebbac 100644 --- a/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java +++ b/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java @@ -51,7 +51,7 @@ public class TransformHandler { try { return this.doHandle(message); } catch (Exception e) { - urlMD5Service.removeUrlMD5(this.getSeedUrl(message)); + urlMD5Service.removeUrlCache(message); throw new RuntimeException(e); } } @@ -158,6 +158,7 @@ public class TransformHandler { .setHeader(MessageConstants.SOURCE_TRACE_ID, message.getHeaders().get(MessageConstants.SOURCE_TRACE_ID)) .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.TRANSFORMER) .setHeader(MessageConstants.TRANSFORMER_TRACE_ID, ID.generate()) + .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET)) .build(); boolean send = streamBridge.send(BindingConstants.TRANSFORMER_KEYS_OUT, keysMsg); if (!send) { @@ -192,6 +193,7 @@ public class TransformHandler { .setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) .setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME)) .setHeader(MessageConstants.SEED_URL, message.getHeaders().get(MessageConstants.SEED_URL)) + .setHeader(MessageConstants.OTHER_CACHE_KEY_SET, message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET)) .build(); results.add(itemMessage); diff --git a/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java b/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java index 4b342c5..69e04ac 100644 --- a/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java +++ b/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java @@ -20,7 +20,7 @@ public class ResDataTransformer implements MessageTransformer { JsonObject res = JsonParser.parseString(json).getAsJsonObject(); boolean hasData = res.has("data"); if (hasData) { - return res.get("data").getAsString(); + return res.get("data").toString(); } else { log.warn("resData transform failed, res not has data, res: {}", res); }