Ver código fonte

新增第一财经视频爬虫

master
yechuan 1 ano atrás
pai
commit
314dcb1d8d
27 arquivos alterados com 741 adições e 53 exclusões
  1. +25
    -7
      common/src/main/java/work/xuye/common/alert/ExceptionAspect.java
  2. +2
    -0
      common/src/main/java/work/xuye/common/constant/BindingConstants.java
  3. +1
    -0
      common/src/main/java/work/xuye/common/constant/MessageConstants.java
  4. +7
    -0
      common/src/main/java/work/xuye/common/db/entity/Task.java
  5. +32
    -0
      common/src/main/java/work/xuye/common/db/entity/vo/NextTaskFilter.java
  6. +1
    -0
      common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java
  7. +6
    -0
      common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java
  8. +4
    -0
      common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java
  9. +72
    -1
      common/src/main/java/work/xuye/common/dto/HttpRequestParams.java
  10. +7
    -0
      common/src/main/java/work/xuye/common/dto/HttpRes.java
  11. +36
    -0
      common/src/main/java/work/xuye/common/dto/SignatureConfig.java
  12. +18
    -0
      common/src/main/java/work/xuye/common/dto/SignatureContext.java
  13. +36
    -0
      common/src/main/java/work/xuye/common/enums/SignatureEnumGroup.java
  14. +18
    -0
      common/src/main/java/work/xuye/common/enums/UpdateStrategy.java
  15. +17
    -0
      common/src/main/java/work/xuye/common/service/UrlMD5Service.java
  16. +23
    -0
      common/src/main/java/work/xuye/common/spel/CustomFunction.java
  17. +59
    -0
      common/src/main/java/work/xuye/common/utils/SignatureUtil.java
  18. +32
    -0
      common/src/main/java/work/xuye/common/utils/signature/AbstractSignatureService.java
  19. +58
    -0
      common/src/main/java/work/xuye/common/utils/signature/RSASignatureService.java
  20. +34
    -0
      common/src/main/java/work/xuye/common/utils/signature/SignatureFactory.java
  21. +3
    -1
      scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java
  22. +5
    -7
      sink/src/main/java/work/xuye/sink/handler/ItemHandler.java
  23. +6
    -1
      sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java
  24. +231
    -33
      source/src/main/java/work/xuye/source/handler/SourceHandler.java
  25. +4
    -1
      source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java
  26. +3
    -1
      transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java
  27. +1
    -1
      transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java

+ 25
- 7
common/src/main/java/work/xuye/common/alert/ExceptionAspect.java Ver arquivo

@@ -23,10 +23,10 @@ public class ExceptionAspect {

private final MessageService messageService;

@AfterThrowing(pointcut = "execution(* work.xuye..*(..))", throwing = "ex")
@AfterThrowing(pointcut = "execution(* work.xuye..*(..))&& !execution(* work.xuye.common.service.MessageService.*(..))", throwing = "ex")
public void handleGlobalException(Exception ex) {
Sentry.capture(ex);

List<StackTraceElement> stackTraceElements = new java.util.ArrayList<>(List.of(ex.getStackTrace()));
stackTraceElements.removeIf(stackTraceElement -> !stackTraceElement.getClassName().contains("work.xuye"));
@@ -38,14 +38,32 @@ public class ExceptionAspect {
.append(stackTraceElement.getClassName()).append(".")
.append(stackTraceElement.getMethodName()).append("()").append(" (line ")
.append(stackTraceElement.getLineNumber()).append(")\n"));
String traceStringString = traceString.toString();

if (traceStringString.contains("work.xuye.common.service.MessageService")){
return;
}

Sentry.capture(ex);

String subTitle = ex.getMessage();
String title = ex.getClass().getName();
if (subTitle == null) {
subTitle = title;
}
messageService.sendMessage(MessageType.exception,
title,
subTitle,
traceString.toString());


for (int i = 1; i <= 3; i++) {
try {
messageService.sendMessage(MessageType.exception,
title,
subTitle,
traceStringString);
return;
} catch (Exception e) {
log.error("send error message error! now {} times ,try retry send again e:", i, e);
}
}

}
}

+ 2
- 0
common/src/main/java/work/xuye/common/constant/BindingConstants.java Ver arquivo

@@ -8,6 +8,8 @@ public class BindingConstants {

public static final String TASK_OUT = "task-out";

public static final String SOURCE_OUT = "source-out";

public static final String SNAPSHOT_OUT_POSTFIX = "-snapshot-out";

public static final String TRANSFORMER_KEYS_OUT = "transformKeys-out";


+ 1
- 0
common/src/main/java/work/xuye/common/constant/MessageConstants.java Ver arquivo

@@ -20,6 +20,7 @@ public class MessageConstants {
public static final String TASK_NAME = "taskName";

public static final String SEED_URL = "seedUrl";
public static final String OTHER_CACHE_KEY_SET = "otherCacheKeySet";


}

+ 7
- 0
common/src/main/java/work/xuye/common/db/entity/Task.java Ver arquivo

@@ -9,6 +9,7 @@ import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import work.xuye.common.db.entity.vo.NextTaskFilter;
import work.xuye.common.db.entity.vo.ParseParams;
import work.xuye.common.db.entity.vo.SinkParams;
import work.xuye.common.db.entity.vo.TransformParams;
@@ -58,6 +59,9 @@ public class Task implements Serializable {
@TableField(value = "sink_params", typeHandler = JacksonTypeHandler.class)
private SinkParams sinkParams;

@TableField(value = "next_filter", typeHandler = JacksonTypeHandler.class)
private NextTaskFilter nextFilter;

@TableField(value = "create_time", fill = FieldFill.INSERT)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class)
@@ -68,6 +72,9 @@ public class Task implements Serializable {
@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime updateTime;

@TableField(value = "is_root")
private Boolean isRoot;

@TableField("version")
@Version
private Integer version;


+ 32
- 0
common/src/main/java/work/xuye/common/db/entity/vo/NextTaskFilter.java Ver arquivo

@@ -0,0 +1,32 @@
package work.xuye.common.db.entity.vo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;
import java.util.Map;

/**
* @author yechuan
* @since 2023/8/8 19:09
**/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class NextTaskFilter {
private String filterArgs;
private String mode;
private String uniqueKey;
private List<NextNodeConfig> nextNodes;
private Map<String, String> placeholderExpressions = Map.of();

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class NextNodeConfig {
private String topic;
private String condition;
private String payload;
}
}

+ 1
- 0
common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java Ver arquivo

@@ -50,6 +50,7 @@ public class SinkParams {
private Status status;
private String fieldName;
private String jsonPath;
private String dbObjectExpression;
}

@Data


+ 6
- 0
common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java Ver arquivo

@@ -5,6 +5,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import work.xuye.common.enums.DbFieldType;
import work.xuye.common.enums.UpdateStrategy;

import java.util.List;

@@ -80,6 +81,11 @@ public class TableTemplate {
* 字段校验表达式规则
*/
private List<String> rules;

/**
* 修改时的策略
*/
private UpdateStrategy updateStrategy = UpdateStrategy.ignored;
}

@Data


+ 4
- 0
common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java Ver arquivo

@@ -5,6 +5,8 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Map;

/**
* @author xuye
* @since 2023/7/7 21:58
@@ -18,4 +20,6 @@ public class TaskMiniVO {
private Integer id;

private String name;

private Map<String, String> args;
}

+ 72
- 1
common/src/main/java/work/xuye/common/dto/HttpRequestParams.java Ver arquivo

@@ -1,13 +1,22 @@
package work.xuye.common.dto;

import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.annotation.TableField;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import work.xuye.common.enums.RequestMode;
import work.xuye.common.enums.SignatureEnumGroup;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
* @author xuye
@@ -24,10 +33,63 @@ public class HttpRequestParams {
private String url;
private Map<String, Object> body = Map.of();
private String charset;
private Map<String, String> headers = Map.of();
private Map<String, Object> headers = Map.of();
private Map<String, String> placeholderExpressions = Map.of();
private String mediaType = MediaType.APPLICATION_JSON_VALUE;

private RequestSignatureConfig signature;

/**
* 仅计算一次的表达式(同次请求后续使用上次计算值)
*/
private Set<String> onceCompileExpressions = Collections.emptySet();

/**
* 缓存的表达式值
*/
@TableField(exist = false)
private Map<String, Object> placeholderExpressionsValueCache = new HashMap<>();


/**
* 参数(由父任务传递,参与计算与组装)
*/
@TableField(exist = false)
private Map<String, String> args = new HashMap<>();


/**
* 根据占位符获取值
*/
public <T> T getPlaceholderValue(String placeholder, Class<T> t, SpelExpressionParser parser, EvaluationContext context) {
if (CollUtil.isEmpty(placeholderExpressions)) {
return null;
}
if (!placeholderExpressions.containsKey(placeholder)) {
return null;
}
boolean cache = false;

if (onceCompileExpressions.contains(placeholder)) {
// 一次请求中 仅计算一次
if (placeholderExpressionsValueCache.containsKey(placeholder)) {
return t.cast(placeholderExpressionsValueCache.get(placeholder));
}
cache = true;
}
context.setVariable("requestParams", this);
String expr = placeholderExpressions.get(placeholder);
T result = parser.parseExpression(expr).getValue(context, t);

if (cache) {
placeholderExpressionsValueCache.put(placeholder, result);
}

return result;


}

@Data
@NoArgsConstructor
@AllArgsConstructor
@@ -36,4 +98,13 @@ public class HttpRequestParams {

}

@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public static class RequestSignatureConfig extends SignatureConfig {
private String signatureFile;
private String signatureContext;
private SignatureEnumGroup.SignatureLocation signatureLocation;
}
}

+ 7
- 0
common/src/main/java/work/xuye/common/dto/HttpRes.java Ver arquivo

@@ -1,5 +1,6 @@
package work.xuye.common.dto;

import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -25,6 +26,12 @@ public class HttpRes {
private Map<String, List<String>> headers = Map.of();
private Long costTimeMillis;

/**
* 零时变量
*/
@JsonInclude
private Object temporary;

public static HttpRes build() {
return new HttpRes();
}


+ 36
- 0
common/src/main/java/work/xuye/common/dto/SignatureConfig.java Ver arquivo

@@ -0,0 +1,36 @@
package work.xuye.common.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* 加解密配置
*
* @author yechuan
* @since 2023/8/8 14:44
**/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SignatureConfig {
/**
* key 算法
*/
private String keyAlgorithm;

/**
* 公私钥类型
*/
private String keyType;

/**
* 加密算法
*/
private String signatureAlgorithm;

/**
* key
*/
private String key;
}

+ 18
- 0
common/src/main/java/work/xuye/common/dto/SignatureContext.java Ver arquivo

@@ -0,0 +1,18 @@
package work.xuye.common.dto;

import lombok.Data;

import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.Signature;

/**
* @author yechuan
* @since 2023/8/9 17:53
**/
@Data
public class SignatureContext {
private PrivateKey priKey;
private PublicKey pubKey;
private Signature signature;
}

+ 36
- 0
common/src/main/java/work/xuye/common/enums/SignatureEnumGroup.java Ver arquivo

@@ -0,0 +1,36 @@
package work.xuye.common.enums;

/**
* @author yechuan
* @since 2023/8/8 14:52
**/
public class SignatureEnumGroup {

/**
* 加密字符串的位置
*/
public static enum SignatureLocation{
header
}

public static enum KeyAlgorithm{
/**
* RSA
*/
RSA
}

public static enum KeyType{
/**
* 私钥
*/
privateKey,
/**
* 公钥
*/
publicKey,
}



}

+ 18
- 0
common/src/main/java/work/xuye/common/enums/UpdateStrategy.java Ver arquivo

@@ -0,0 +1,18 @@
package work.xuye.common.enums;

/**
* 修改时的策略
*
* @author yechuan
* @since 2023/8/10 13:37
**/
public enum UpdateStrategy {
/**
* 忽略判断
*/
ignored,
/**
* 非NULL时加入sql
*/
not_null
}

+ 17
- 0
common/src/main/java/work/xuye/common/service/UrlMD5Service.java Ver arquivo

@@ -2,10 +2,16 @@ package work.xuye.common.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import work.xuye.common.constant.MessageConstants;
import work.xuye.common.enums.ResourceStatus;
import work.xuye.common.store.UrlMD5MapStore;

import java.util.Set;

/**
* @author xuye
* @since 2023/3/6 21:58
@@ -37,4 +43,15 @@ public class UrlMD5Service {
Long aLong = urlMD5MapStore.removeKey(seedUrl);
log.info("删除url对应的md5,url:{}, 删除条数:{}", seedUrl, aLong);
}

public void removeUrlCache(Message message) {
Set<String> parentOtherCacheKeySet = message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class);
if (!ObjectUtils.isEmpty(parentOtherCacheKeySet)) {
parentOtherCacheKeySet.forEach(this::removeUrlMD5);
}
String sendUrl = message.getHeaders().get(MessageConstants.SEED_URL, String.class);
if (!StringUtils.hasText(sendUrl)) {
this.removeUrlMD5(sendUrl);
}
}
}

+ 23
- 0
common/src/main/java/work/xuye/common/spel/CustomFunction.java Ver arquivo

@@ -4,6 +4,8 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.expression.Expression;
@@ -41,6 +43,13 @@ public class CustomFunction {

private final NsKVMapStore mapStore;

public String localDateTimeFormat(LocalDateTime time, String pattern) {
if (time == null) {
return "";
}
return time.format(DateTimeFormatter.ofPattern(pattern));
}

public String jsonExtract(String json, String path) {
String result = null;
try {
@@ -53,6 +62,20 @@ public class CustomFunction {
return result;
}

public String jsonExtractOrPathNotFound(String json, String path) {
String result = null;
try {
result = JsonPath.read(json, path).toString().trim();
} catch (PathNotFoundException e) {
log.warn("jsonPath not find, json: [{}], path: [{}]", json, path);
} catch (Exception e) {
log.error("jsonExtra handler error, json: [{}], path: [{}]", json, path);
throw new RuntimeException(e);
}
log.debug("jsonExtra handler, json: [{}], path: [{}], result: [{}]", json, path, result);
return result;
}

public String jsonExtractReplaceAll(String json, String jsonPath, String regex, String replacement) {
return this.jsonExtract(json, jsonPath).replaceAll(regex, replacement).trim();
}


+ 59
- 0
common/src/main/java/work/xuye/common/utils/SignatureUtil.java Ver arquivo

@@ -0,0 +1,59 @@
package work.xuye.common.utils;

import org.apache.commons.codec.binary.Base64;
import org.springframework.stereotype.Component;
import org.springframework.util.DigestUtils;
import work.xuye.common.dto.HttpRequestParams;

import java.nio.charset.StandardCharsets;
import java.security.KeyFactory;
import java.security.PrivateKey;
import java.security.Signature;
import java.security.spec.PKCS8EncodedKeySpec;

/**
* @author yechuan
* @since 2023/8/7 15:08
**/
@Component
public class SignatureUtil {

private final static String privateKey = "MIICdwIBADANBgkqhkiG9w0BAQEFAASCAmEwggJdAgEAAoGBANTyWsgvVOW6lNH4\n" +
"koggbORNIdM0mw+klkTfAZJ8hM9SvxaPWgo55dU1zp46nUs15ZStf/A4EmOeh4jA\n" +
"SLqHf3Zd2WJc5izjEm70pnwHXRCjOA8i4nR6ia0fpzZPf+FNqElmvsfrvqk185cf\n" +
"kAXk2RoXSAykUi+2UMK6TlTaHmqXAgMBAAECgYAmCFcQc+us0CMuUUASkgAA0ond\n" +
"CAM9yv6PtGi6egTaZoP8ioPhWa/j4aVSe1OGkEy9vjMge1NFeZXpZbZXokWwUdmi\n" +
"xxOcCYgYKebJ0Fmssvj/GSRL93B1JlXs88MdedAGlef1b1IyURUSDbkkbOZmHnON\n" +
"OgkoSafcB+JFx6Ea8QJBAPTaiAZSsco9e23EMXfhy+h7CjYYY8LJ+gBrnEZV2jns\n" +
"O7i2W/2c9sEzexG8C/W1oEbEDsfkmCKvf/mQ+dJb0d8CQQDeo/yuk5Z0W+yBos5U\n" +
"h0vjlAJzVL812QNv02bogmGaS7y3Ao1+/cVeTWSrlEjjM2tcbnpFtOgFtLBl1lFX\n" +
"9q5JAkB6gskimMe6UC7sygiSWhjjdoSycluf/90lzrH/gz9QUgHDtwKqD5prKq3+\n" +
"Pp+hTkImhjx7CcaRPEyE+2P0O9rzAkEAq2XJgEhkmn2uDHrepxplZPUsEcebUIQZ\n" +
"7jvsTHEbXKKTzLwtXCdXi2q/ZovItQh/zW/Lt+A2gzYAWtXsV3Cz6QJBAJUTb8kM\n" +
"5iUYlWupmS6kOSfYFOqrVvI3w9kZUocsxv338dpzuKxikkIxAfiBTtbVXawEkF48\n" +
"tTSQsDJmnXj2Hhw=";



public String signature(String content) throws Exception {

byte[] decode = Base64.decodeBase64(privateKey);
PKCS8EncodedKeySpec priPKCS8 = new PKCS8EncodedKeySpec(decode);
PrivateKey priKey = KeyFactory.getInstance("RSA").generatePrivate(priPKCS8);
String md5 = DigestUtils.md5DigestAsHex(content.getBytes(StandardCharsets.UTF_8)).toUpperCase();
Signature signature = Signature.getInstance("SHA1WithRSA");
signature.initSign(priKey);
signature.update(md5.getBytes(StandardCharsets.UTF_8));
return Base64.encodeBase64String(signature.sign()).trim();
}

public String packageContent(HttpRequestParams params, String timestamp, String body) {
return "ENCRYPT-ALGORITHM" + "=" + "RSA" +
"&" +
"PARAMS" + "=" + body +
"&" +
"TIMESTAMP" + "=" + timestamp;

}

}

+ 32
- 0
common/src/main/java/work/xuye/common/utils/signature/AbstractSignatureService.java Ver arquivo

@@ -0,0 +1,32 @@
package work.xuye.common.utils.signature;

import work.xuye.common.dto.SignatureConfig;
import work.xuye.common.dto.SignatureContext;

import java.security.GeneralSecurityException;
import java.security.Signature;

/**
* @author yechuan
* @since 2023/8/8 14:42
**/
public abstract class AbstractSignatureService {

protected SignatureContext signatureContext;

/**
* 使用私钥加密
*
* @param context
* @return
* @throws GeneralSecurityException
*/
public abstract String signatureByPriKey(String context) throws GeneralSecurityException;


void init(SignatureConfig signatureConfig) throws GeneralSecurityException {
signatureContext = new SignatureContext();
signatureContext.setSignature(Signature.getInstance(signatureConfig.getSignatureAlgorithm()));
}

}

+ 58
- 0
common/src/main/java/work/xuye/common/utils/signature/RSASignatureService.java Ver arquivo

@@ -0,0 +1,58 @@
package work.xuye.common.utils.signature;

import org.apache.commons.codec.binary.Base64;
import org.springframework.util.DigestUtils;
import work.xuye.common.dto.SignatureConfig;
import work.xuye.common.enums.SignatureEnumGroup;

import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.security.KeyFactory;
import java.security.PrivateKey;
import java.security.Signature;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;

/**
* RSA
*
* @author yechuan
* @since 2023/8/8 14:47
**/

class RSASignatureService extends AbstractSignatureService {

@Override
public String signatureByPriKey(String context) throws GeneralSecurityException {
if (signatureContext == null) {
throw new RuntimeException("未初始化公私钥信息");
}
Signature signature = signatureContext.getSignature();
PrivateKey priKey = signatureContext.getPriKey();
signature.initSign(priKey);
context = DigestUtils.md5DigestAsHex(context.getBytes(StandardCharsets.UTF_8)).toUpperCase();
signature.update(context.getBytes(StandardCharsets.UTF_8));
return Base64.encodeBase64String(signature.sign());
}

@Override
void init(SignatureConfig signatureConfig) throws GeneralSecurityException {
super.init(signatureConfig);

String keyTypeName = signatureConfig.getKeyType();
SignatureEnumGroup.KeyType keyType = SignatureEnumGroup.KeyType.valueOf(keyTypeName);

KeyFactory keyFactory = KeyFactory.getInstance(SignatureEnumGroup.KeyAlgorithm.RSA.name());

byte[] encodedKey = Base64.decodeBase64(signatureConfig.getKey());

if (SignatureEnumGroup.KeyType.privateKey.equals(keyType)) {
signatureContext.setPriKey(keyFactory.generatePrivate(new PKCS8EncodedKeySpec(encodedKey)));
} else if (SignatureEnumGroup.KeyType.publicKey.equals(keyType)) {
signatureContext.setPubKey(keyFactory.generatePublic(new X509EncodedKeySpec(encodedKey)));
}

}


}

+ 34
- 0
common/src/main/java/work/xuye/common/utils/signature/SignatureFactory.java Ver arquivo

@@ -0,0 +1,34 @@
package work.xuye.common.utils.signature;

import work.xuye.common.dto.SignatureConfig;
import work.xuye.common.enums.SignatureEnumGroup;

import java.security.GeneralSecurityException;

/**
* @author yechuan
* @since 2023/8/8 15:20
**/
public class SignatureFactory {

/**
* 获取加解密实现
*/
public static AbstractSignatureService getSignatureService(SignatureConfig signatureConfig) throws GeneralSecurityException {

AbstractSignatureService result = null;

switch (SignatureEnumGroup.KeyAlgorithm.valueOf(signatureConfig.getKeyAlgorithm())) {
case RSA:
result = new RSASignatureService();
break;
}
if (result == null) {
throw new RuntimeException(String.format("不支持的签名方式,signatureConfig:%s", signatureConfig));
}
result.init(signatureConfig);
return result;

}

}

+ 3
- 1
scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java Ver arquivo

@@ -1,5 +1,6 @@
package work.xuye.scheduler.service;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
@@ -73,7 +74,8 @@ public class IssueService implements ApplicationRunner {

public ArrayList<TaskVO> issueAllTask() {
log.info("--------------------");
List<Task> list = taskService.list();
// List<Task> list = taskService.list();
List<Task> list = taskService.list(new LambdaQueryWrapper<Task>().eq(Task::getIsRoot,true));
Collections.shuffle(list);
ArrayList<TaskVO> result = new ArrayList<>();
list.forEach(task -> {


+ 5
- 7
sink/src/main/java/work/xuye/sink/handler/ItemHandler.java Ver arquivo

@@ -22,10 +22,7 @@ import work.xuye.common.spel.SpringExpressionLanguageEvaluator;
import work.xuye.common.utils.DebugUtil;
import work.xuye.sink.service.JdbcService;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

/**
* @author xuye
@@ -64,8 +61,7 @@ public class ItemHandler {
}

private void cleanCache(Message<String> message) {
String seedUrl = (String) message.getHeaders().get(MessageConstants.SEED_URL);
urlMD5Service.removeUrlMD5(seedUrl);
urlMD5Service.removeUrlCache(message);
}


@@ -319,8 +315,10 @@ public class ItemHandler {
if (!ObjectUtils.isEmpty(checkUpdateConfig.getJsonPath())) {
dbValue = cf.jsonExtract(dbValue.toString(), checkUpdateConfig.getJsonPath());
nowValue = cf.jsonExtract(nowValue.toString(), checkUpdateConfig.getJsonPath());
} else if (!ObjectUtils.isEmpty(checkUpdateConfig.getDbObjectExpression())) {
dbValue = evaluator.evaluate(dbValue, checkUpdateConfig.getDbObjectExpression(), String.class);
} else {
throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是没有配置jsonPath");
throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是没有配置jsonPath与dbObjectExpression");
}
if (dbValue.equals(nowValue)) {
log.info("[{}][✅ 已是最新][{} {}] " + "{}: {}",


+ 6
- 1
sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java Ver arquivo

@@ -2,6 +2,7 @@ package work.xuye.sink.handler;

import work.xuye.common.db.entity.vo.TableTemplate;
import work.xuye.common.enums.DbFieldType;
import work.xuye.common.enums.UpdateStrategy;

import java.util.ArrayList;
import java.util.List;
@@ -37,13 +38,17 @@ public class SqlGenerator {
boolean isFirstField = true;

for (TableTemplate.Field field : fields) {
String fieldValue = formatFieldValue(field);
if ("NULL".equals(fieldValue) && UpdateStrategy.not_null.equals(field.getUpdateStrategy())) {
continue;
}
if (!isFirstField) {
sqlBuilder.append(", ");
} else {
isFirstField = false;
}
sqlBuilder.append('`').append(field.getFieldName()).append('`')
.append('=').append(formatFieldValue(field));
.append('=').append(fieldValue);
}

TableTemplate.Field updateTimeField = template.getUpdateTimeField();


+ 231
- 33
source/src/main/java/work/xuye/source/handler/SourceHandler.java Ver arquivo

@@ -1,8 +1,10 @@
package work.xuye.source.handler;

import com.google.gson.Gson;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
@@ -13,6 +15,7 @@ import org.springframework.util.DigestUtils;
import org.springframework.util.ObjectUtils;
import work.xuye.common.constant.*;
import work.xuye.common.db.entity.Task;
import work.xuye.common.db.entity.vo.NextTaskFilter;
import work.xuye.common.db.entity.vo.TaskMiniVO;
import work.xuye.common.db.service.TaskManager;
import work.xuye.common.dto.HttpRequestParams;
@@ -20,17 +23,21 @@ import work.xuye.common.dto.HttpRes;
import work.xuye.common.enums.ProcessMode;
import work.xuye.common.enums.RequestMode;
import work.xuye.common.enums.ResourceStatus;
import work.xuye.common.enums.SignatureEnumGroup;
import work.xuye.common.service.UrlMD5Service;
import work.xuye.common.utils.DebugUtil;
import work.xuye.common.utils.ID;
import work.xuye.common.utils.JsonPathUtil;
import work.xuye.common.utils.signature.AbstractSignatureService;
import work.xuye.common.utils.signature.SignatureFactory;
import work.xuye.source.request.RequestClient;
import work.xuye.source.spel.CustomRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.stream.Stream;

/**
* @author xuye
@@ -47,6 +54,7 @@ public class SourceHandler {
private final UrlMD5Service urlMD5Service;
private final StreamBridge streamBridge;
private final TaskManager taskManager;
public final Gson gson;

public List<Message<HttpRes>> handle(Message<TaskMiniVO> message) {
TaskMiniVO taskMiniVO = message.getPayload();
@@ -58,18 +66,33 @@ public class SourceHandler {
return null;
}
HttpRequestParams req = task.getRequestParams();
if (!ObjectUtils.isEmpty(taskMiniVO.getArgs())) {
req.getArgs().putAll(taskMiniVO.getArgs());
}

Set<String> otherCacheKeySet = new HashSet<>();
Set parentOtherCacheKeySet = message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class);
if (parentOtherCacheKeySet != null) {
otherCacheKeySet.addAll(parentOtherCacheKeySet);
}


String taskName = task.getName();
HttpRes res = null;
List<HttpRes> result = null;
if (RequestMode.normal.equals(req.getMode())) {
res = this.request(req);
result = this.handleRes(task, res);
} else if (RequestMode.SpEL.equals(req.getMode())) {
try {
if (RequestMode.normal.equals(req.getMode())) {
res = this.request(req);
result = this.handleRes(task, res);
} else if (RequestMode.SpEL.equals(req.getMode())) {

StandardEvaluationContext context = new StandardEvaluationContext();
Expression expression = parser.parseExpression(task.getRequestParams().getSpELConfig().getExpression());
List<HttpRes> value = expression.getValue(context, this, List.class);
result = this.handleRes(task, value);
StandardEvaluationContext context = new StandardEvaluationContext();
Expression expression = parser.parseExpression(task.getRequestParams().getSpELConfig().getExpression());
List<HttpRes> value = expression.getValue(context, this, List.class);
result = this.handleRes(task, value);
}
} catch (Exception e) {
urlMD5Service.removeUrlCache(message);
}
if (ObjectUtils.isEmpty(result)) {
return null;
@@ -81,11 +104,12 @@ public class SourceHandler {
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE_WATCH)
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID))
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate())
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID))
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME))
.setHeader(MessageConstants.TASK_ID, task.getId())
.setHeader(MessageConstants.TASK_NAME, task.getName())
.setHeader(SnapshotConstants.URL, req.getUrl())
.setHeader(SnapshotConstants.RESOURCE_STATUS, r.getResourceStatus())
.setHeader(SnapshotConstants.STATUS, r.getStatus())
.setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet)
.build();

boolean send = streamBridge.send(taskName + BindingConstants.SNAPSHOT_OUT_POSTFIX, watchMessage);
@@ -96,6 +120,26 @@ public class SourceHandler {
return null;
}

if (Objects.nonNull(task.getNextFilter())) {

Map<String, Message> nextNodeMessage = buildNextNodeMessage(result, task, message);

if (!ObjectUtils.isEmpty(nextNodeMessage)) {
nextNodeMessage.forEach((k, v) -> {
Set set = v.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class);
if (!ObjectUtils.isEmpty(set) && !ObjectUtils.isEmpty(otherCacheKeySet)) {
set.addAll(otherCacheKeySet);
v.getHeaders().put(MessageConstants.OTHER_CACHE_KEY_SET, set);
}
boolean send = streamBridge.send(k, v);
if (!send) {
throw new RuntimeException("send message failed");
}
});
}
return null;
}

ArrayList<Message<HttpRes>> messageList = new ArrayList<>();
result.forEach(
item -> {
@@ -104,9 +148,10 @@ public class SourceHandler {
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE)
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID))
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate())
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID))
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME))
.setHeader(MessageConstants.TASK_ID, task.getId())
.setHeader(MessageConstants.TASK_NAME, task.getName())
.setHeader(MessageConstants.SEED_URL, item.getRequestParams().getUrl())
.setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet)
.build());
}
);
@@ -115,6 +160,93 @@ public class SourceHandler {

}

private Map<String, Message> buildNextNodeMessage(List<HttpRes> result, Task task, Message<TaskMiniVO> message) {
NextTaskFilter filter = task.getNextFilter();
Set<String> otherCacheKeySet = new HashSet<>();
Map<String, Message> resultMap = new HashMap<>();
result.stream()
.map(item -> {
HttpRes httpRes = HttpRes.build()
.status(item.getStatus())
.costTimeMillis(item.getCostTimeMillis())
.headers(item.getHeaders())
.requestParams(item.getRequestParams());
httpRes.setResourceStatus(item.getResourceStatus());
httpRes.setTemporary(JsonPathUtil.read(item.getBody(), task.getNextFilter().getFilterArgs()));
return httpRes;
})
.flatMap(item -> {
if (item.getTemporary() instanceof Collection) {
return ((Collection<?>) item.getTemporary())
.stream()
.map(temporary -> {
HttpRes params = HttpRes.build()
.status(item.getStatus())
.costTimeMillis(item.getCostTimeMillis())
.headers(item.getHeaders())
.requestParams(item.getRequestParams());
params.setResourceStatus(item.getResourceStatus());
params.setTemporary(temporary);
return params;
});
} else {
return Stream.of(item);
}
})
.forEach(httpRes -> {
Object temporary = httpRes.getTemporary();
httpRes.setTemporary(null);
Map map = (Map) temporary;
// 是否更新判断
String uniqueKey = map.get(filter.getUniqueKey()).toString();
uniqueKey = httpRes.getRequestParams().getUrl() + "?uniqueKey=" + uniqueKey;
String md5Digest = DigestUtils.md5DigestAsHex(gson.toJson(map).getBytes(StandardCharsets.UTF_8));
ResourceStatus resourceStatus = urlMD5Service.getResourceStatus(uniqueKey, md5Digest);
urlMD5Service.put(uniqueKey, md5Digest);
if (ResourceStatus.UNCHANGED.equals(resourceStatus)) {
return;
}

// 节点命中
List<NextTaskFilter.NextNodeConfig> nextNodes = filter.getNextNodes();
if (ObjectUtils.isEmpty(nextNodes)) {
return;
}
StandardEvaluationContext context = new StandardEvaluationContext(this);
context.setVariable("filterArgs", map);
context.setVariable("httpRes", httpRes);

for (NextTaskFilter.NextNodeConfig nextNode : nextNodes) {

if (!Boolean.TRUE.equals(getPlaceholderSpel(nextNode.getCondition(), filter.getPlaceholderExpressions(), context, Boolean.class))) {
continue;
}
otherCacheKeySet.add(uniqueKey);
// 命中
String topic = getPlaceholderSpel(nextNode.getTopic(), filter.getPlaceholderExpressions(), context, String.class);
Object payload = getPlaceholderSpel(nextNode.getPayload(), filter.getPlaceholderExpressions(), context, Object.class);
Message<Object> nextTaskMessage = MessageBuilder
.withPayload(payload)
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE)
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID))
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate())
.setHeader(MessageConstants.TASK_ID, task.getId())
.setHeader(MessageConstants.TASK_NAME, task.getName())
.setHeader(MessageConstants.SEED_URL, httpRes.getRequestParams().getUrl())
.setHeader(MessageConstants.SEED_URL, httpRes.getRequestParams().getUrl())
.setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet)
.build();
resultMap.put(topic, nextTaskMessage);
if ("break".equals(filter.getMode())) {
break;
}
}
}
);

return resultMap;

}

private void setResourceStatus(HttpRes res) {
String md5Digest = DigestUtils.md5DigestAsHex(res.getBody().getBytes());
@@ -186,25 +318,87 @@ public class SourceHandler {
public HttpRes request(HttpRequestParams req) {
this.preHandleUrl(req);
this.preHandleBody(req);
this.preHandleHeader(req);
this.preHandleSignature(req);
return requestClient.execute(req);
}

private void preHandleBody(HttpRequestParams request) {
Map<String, Object> body = request.getBody();
if (ObjectUtils.isEmpty(body)) {

private void preHandleSignature(HttpRequestParams requestParams) {
HttpRequestParams.RequestSignatureConfig signatureConfig = requestParams.getSignature();
if (ObjectUtils.isEmpty(signatureConfig)) {
return;
}

String signatureContext = requestParams.getPlaceholderValue(signatureConfig.getSignatureContext(), String.class, parser, new StandardEvaluationContext(this));
AbstractSignatureService signatureService = null;
try {
signatureService = SignatureFactory.getSignatureService(signatureConfig);
String signature = signatureService.signatureByPriKey(signatureContext);
// 优化
if (SignatureEnumGroup.SignatureLocation.header.equals(signatureConfig.getSignatureLocation())) {
requestParams.getHeaders().put(signatureConfig.getSignatureFile(), signature);
}
} catch (GeneralSecurityException e) {
throw new RuntimeException(e);
}
}

private void preHandleHeader(HttpRequestParams request) {
Map<String, Object> headers = request.getHeaders();
if (ObjectUtils.isEmpty(headers)) {
return;
}
for (String key : body.keySet()) {
Object value = body.get(key);
Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(value.toString());
request.setHeaders(aclculationPlaceholderMap(request.getHeaders(), request));
}


/**
* 根据占位符从spelMap中获取对应的spel表达式并计算结果
* 或直接返回占位符(不满足占位符规则时,即无需计算)
*/
private <T> T getPlaceholderSpel(Object placeholder, Map<String, String> spelMap, EvaluationContext context, Class<T> resultClazz) {
Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(placeholder.toString());
if (matcher.find()) {
return parser.parseExpression(spelMap.get(matcher.group())).getValue(context, resultClazz);
}
return (T) placeholder;
}

/**
* 计算占位符的值,并返回此map
*/
private Map<String, Object> aclculationPlaceholderMap(Map<String, Object> placeholderMap, HttpRequestParams request) {

for (String key : placeholderMap.keySet()) {
Object placeholderValue = placeholderMap.get(key);
Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(placeholderValue.toString());
if (matcher.find()) {
String placeholder = matcher.group();
String expr = request.getPlaceholderExpressions().get(placeholder);
Object result = parser.parseExpression(expr).getValue();
body.put(key, result);
Object value = request.getPlaceholderValue(placeholder, Object.class, parser, new StandardEvaluationContext(this));
placeholderMap.put(key, value);
}
}
request.setBody(body);
return placeholderMap;
}

private void preHandleBody(HttpRequestParams request) {
Map<String, Object> body = request.getBody();
if (ObjectUtils.isEmpty(body)) {
return;
}
request.setBody(aclculationPlaceholderMap(request.getBody(), request));
// for (String key : body.keySet()) {
// Object value = body.get(key);
// Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(value.toString());
// if (matcher.find()) {
// String placeholder = matcher.group();
// String expr = request.getPlaceholderExpressions().get(placeholder);
// Object result = parser.parseExpression(expr).getValue();
// body.put(key, result);
// }
// }
// request.setBody(body);
}

private void preHandleUrl(HttpRequestParams request) {
@@ -213,19 +407,23 @@ public class SourceHandler {
Map<String, Object> patternMap = new HashMap<>();
while (matcher.find()) {
String placeholder = matcher.group();
patternMap.put(placeholder, null);
patternMap.put(placeholder, placeholder);
}

if (ObjectUtils.isEmpty(patternMap)) {
return;
}
Map<String, String> placeholderExpressions = request.getPlaceholderExpressions();

for (String key : patternMap.keySet()) {
String expr = placeholderExpressions.get(key);
Object value = parser.parseExpression(expr).getValue();
patternMap.put(key, value);
}

aclculationPlaceholderMap(patternMap, request);
// Map<String, String> placeholderExpressions = request.getPlaceholderExpressions();


// for (String key : patternMap.keySet()) {
// String expr = placeholderExpressions.get(key);
// Object value = parser.parseExpression(expr).getValue();
// patternMap.put(key, value);
// }
for (String key : patternMap.keySet()) {
url = url.replace(key, patternMap.get(key).toString());
}


+ 4
- 1
source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java Ver arquivo

@@ -65,7 +65,10 @@ public class OkHttpRequestClient implements RequestClient {

// request header
if (!ObjectUtils.isEmpty(request.getHeaders())) {
requestBuilder.headers(Headers.Companion.of(request.getHeaders()));
request.getHeaders()
.forEach((k, v) -> requestBuilder.addHeader(k, v.toString()));

// requestBuilder.headers(Headers.Companion.of());
}

// request body


+ 3
- 1
transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java Ver arquivo

@@ -51,7 +51,7 @@ public class TransformHandler {
try {
return this.doHandle(message);
} catch (Exception e) {
urlMD5Service.removeUrlMD5(this.getSeedUrl(message));
urlMD5Service.removeUrlCache(message);
throw new RuntimeException(e);
}
}
@@ -158,6 +158,7 @@ public class TransformHandler {
.setHeader(MessageConstants.SOURCE_TRACE_ID, message.getHeaders().get(MessageConstants.SOURCE_TRACE_ID))
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.TRANSFORMER)
.setHeader(MessageConstants.TRANSFORMER_TRACE_ID, ID.generate())
.setHeader(MessageConstants.OTHER_CACHE_KEY_SET, message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET))
.build();
boolean send = streamBridge.send(BindingConstants.TRANSFORMER_KEYS_OUT, keysMsg);
if (!send) {
@@ -192,6 +193,7 @@ public class TransformHandler {
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID))
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME))
.setHeader(MessageConstants.SEED_URL, message.getHeaders().get(MessageConstants.SEED_URL))
.setHeader(MessageConstants.OTHER_CACHE_KEY_SET, message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET))

.build();
results.add(itemMessage);


+ 1
- 1
transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java Ver arquivo

@@ -20,7 +20,7 @@ public class ResDataTransformer implements MessageTransformer {
JsonObject res = JsonParser.parseString(json).getAsJsonObject();
boolean hasData = res.has("data");
if (hasData) {
return res.get("data").getAsString();
return res.get("data").toString();
} else {
log.warn("resData transform failed, res not has data, res: {}", res);
}


Carregando…
Cancelar
Salvar