Browse Source

Merge branch 'fea_dev_2023_08_07_yechuan' into 'master'

第一财经数据爬取

See merge request fhl/std-news-process!6
master
Arthur Qin 1 year ago
parent
commit
e6b9b43568
40 changed files with 1178 additions and 247 deletions
  1. +6
    -1
      common/pom.xml
  2. +25
    -7
      common/src/main/java/work/xuye/common/alert/ExceptionAspect.java
  3. +2
    -0
      common/src/main/java/work/xuye/common/constant/BindingConstants.java
  4. +1
    -0
      common/src/main/java/work/xuye/common/constant/MessageConstants.java
  5. +7
    -0
      common/src/main/java/work/xuye/common/db/entity/Task.java
  6. +63
    -0
      common/src/main/java/work/xuye/common/db/entity/vo/Dispatcher.java
  7. +4
    -0
      common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java
  8. +6
    -0
      common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java
  9. +7
    -0
      common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java
  10. +72
    -1
      common/src/main/java/work/xuye/common/dto/HttpRequestParams.java
  11. +7
    -0
      common/src/main/java/work/xuye/common/dto/HttpRes.java
  12. +36
    -0
      common/src/main/java/work/xuye/common/dto/SignatureConfig.java
  13. +18
    -0
      common/src/main/java/work/xuye/common/dto/SignatureContext.java
  14. +36
    -0
      common/src/main/java/work/xuye/common/enums/SignatureEnumGroup.java
  15. +18
    -0
      common/src/main/java/work/xuye/common/enums/UpdateStrategy.java
  16. +1
    -1
      common/src/main/java/work/xuye/common/service/MessageService.java
  17. +17
    -0
      common/src/main/java/work/xuye/common/service/UrlMD5Service.java
  18. +23
    -0
      common/src/main/java/work/xuye/common/spel/CustomFunction.java
  19. +32
    -0
      common/src/main/java/work/xuye/common/utils/signature/AbstractSignatureService.java
  20. +70
    -0
      common/src/main/java/work/xuye/common/utils/signature/RSASignatureService.java
  21. +33
    -0
      common/src/main/java/work/xuye/common/utils/signature/SignatureFactory.java
  22. +7
    -1
      common/src/main/resources/markdown/task.md
  23. +180
    -0
      helper/src/main/java/work/xuye/helper/checker/DataDiffChecker.java
  24. +14
    -0
      helper/src/main/java/work/xuye/helper/checker/NewsTypeEnum.java
  25. +35
    -0
      helper/src/main/java/work/xuye/helper/controller/CheckController.java
  26. +28
    -0
      helper/src/main/java/work/xuye/helper/properties/DatasourceConfigProperties.java
  27. +25
    -7
      helper/src/main/java/work/xuye/helper/properties/HelperProperties.java
  28. +44
    -0
      helper/src/main/java/work/xuye/helper/service/JdbcTemplateManager.java
  29. +0
    -171
      helper/src/main/java/work/xuye/helper/service/UpdateService.java
  30. +24
    -0
      helper/src/main/java/work/xuye/helper/vo/DataCheckVO.java
  31. +39
    -6
      helper/src/main/resources/application.yml
  32. +3
    -1
      scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java
  33. +9
    -8
      sink/src/main/java/work/xuye/sink/handler/ItemHandler.java
  34. +6
    -1
      sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java
  35. +233
    -33
      source/src/main/java/work/xuye/source/handler/SourceHandler.java
  36. +3
    -1
      source/src/main/java/work/xuye/source/properties/SourceProperties.java
  37. +13
    -5
      source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java
  38. +3
    -1
      transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java
  39. +26
    -0
      transformer/src/main/java/work/xuye/transformer/transformer/JsonDataTransformer.java
  40. +2
    -2
      transformer/src/main/java/work/xuye/transformer/transformer/XmlToJsonTransformer.java

+ 6
- 1
common/pom.xml View File

@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>work.xuye</groupId>
@@ -120,6 +121,10 @@
<artifactId>sq-sentry</artifactId>
<version>1.1.8</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>

</dependencies>



+ 25
- 7
common/src/main/java/work/xuye/common/alert/ExceptionAspect.java View File

@@ -23,10 +23,10 @@ public class ExceptionAspect {

private final MessageService messageService;

@AfterThrowing(pointcut = "execution(* work.xuye..*(..))", throwing = "ex")
@AfterThrowing(pointcut = "execution(* work.xuye..*(..))&& !execution(* work.xuye.common.service.MessageService.*(..))", throwing = "ex")
public void handleGlobalException(Exception ex) {
Sentry.capture(ex);

List<StackTraceElement> stackTraceElements = new java.util.ArrayList<>(List.of(ex.getStackTrace()));
stackTraceElements.removeIf(stackTraceElement -> !stackTraceElement.getClassName().contains("work.xuye"));
@@ -38,14 +38,32 @@ public class ExceptionAspect {
.append(stackTraceElement.getClassName()).append(".")
.append(stackTraceElement.getMethodName()).append("()").append(" (line ")
.append(stackTraceElement.getLineNumber()).append(")\n"));
String traceStringString = traceString.toString();

if (traceStringString.contains("work.xuye.common.service.MessageService")){
return;
}

Sentry.capture(ex);

String subTitle = ex.getMessage();
String title = ex.getClass().getName();
if (subTitle == null) {
subTitle = title;
}
messageService.sendMessage(MessageType.exception,
title,
subTitle,
traceString.toString());


for (int i = 1; i <= 3; i++) {
try {
messageService.sendMessage(MessageType.exception,
title,
subTitle,
traceStringString);
return;
} catch (Exception e) {
log.error("send error message error! now {} times ,try retry send again e:", i, e);
}
}

}
}

+ 2
- 0
common/src/main/java/work/xuye/common/constant/BindingConstants.java View File

@@ -8,6 +8,8 @@ public class BindingConstants {

public static final String TASK_OUT = "task-out";

public static final String SOURCE_OUT = "source-out";

public static final String SNAPSHOT_OUT_POSTFIX = "-snapshot-out";

public static final String TRANSFORMER_KEYS_OUT = "transformKeys-out";


+ 1
- 0
common/src/main/java/work/xuye/common/constant/MessageConstants.java View File

@@ -20,6 +20,7 @@ public class MessageConstants {
public static final String TASK_NAME = "taskName";

public static final String SEED_URL = "seedUrl";
public static final String OTHER_CACHE_KEY_SET = "otherCacheKeySet";


}

+ 7
- 0
common/src/main/java/work/xuye/common/db/entity/Task.java View File

@@ -9,6 +9,7 @@ import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import work.xuye.common.db.entity.vo.Dispatcher;
import work.xuye.common.db.entity.vo.ParseParams;
import work.xuye.common.db.entity.vo.SinkParams;
import work.xuye.common.db.entity.vo.TransformParams;
@@ -58,6 +59,9 @@ public class Task implements Serializable {
@TableField(value = "sink_params", typeHandler = JacksonTypeHandler.class)
private SinkParams sinkParams;

@TableField(value = "dispatcher", typeHandler = JacksonTypeHandler.class)
private Dispatcher dispatcher;

@TableField(value = "create_time", fill = FieldFill.INSERT)
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class)
@@ -68,6 +72,9 @@ public class Task implements Serializable {
@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime updateTime;

@TableField(value = "is_root")
private Boolean isRoot;

@TableField("version")
@Version
private Integer version;


+ 63
- 0
common/src/main/java/work/xuye/common/db/entity/vo/Dispatcher.java View File

@@ -0,0 +1,63 @@
package work.xuye.common.db.entity.vo;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;
import java.util.Map;

/**
* 分流设置
* @author yechuan
* @since 2023/8/8 19:09
**/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Dispatcher {
/**
* 参与计算的jsonPath
*/
private String filterArgs;

/**
* break 表示命中后不再继续尝试匹配其他的节点
*/
private String mode;

/**
* 当前的唯一key,用于缓存判断是否变更
*/
private String uniqueKey;

/**
* 后继节点配置
*/
private List<NextNodeConfig> nextNodes;

/**
* spel表达式
*/
private Map<String, String> placeholderExpressions = Map.of();

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class NextNodeConfig {
/**
* 消息往哪里个topic发 以通知后继任务执行
*/
private String topic;

/**
* 需要满足的条件
*/
private String condition;

/**
* 发送的对象
*/
private String payload;
}
}

+ 4
- 0
common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java View File

@@ -50,6 +50,10 @@ public class SinkParams {
private Status status;
private String fieldName;
private String jsonPath;
/**
* 数据库对象转化为String的spel表达式
*/
private String dbObjectExpression;
}

@Data


+ 6
- 0
common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java View File

@@ -5,6 +5,7 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import work.xuye.common.enums.DbFieldType;
import work.xuye.common.enums.UpdateStrategy;

import java.util.List;

@@ -80,6 +81,11 @@ public class TableTemplate {
* 字段校验表达式规则
*/
private List<String> rules;

/**
* 修改时的策略
*/
private UpdateStrategy updateStrategy = UpdateStrategy.ignored;
}

@Data


+ 7
- 0
common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java View File

@@ -5,6 +5,8 @@ import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Map;

/**
* @author xuye
* @since 2023/7/7 21:58
@@ -18,4 +20,9 @@ public class TaskMiniVO {
private Integer id;

private String name;

/**
* 用于存储父task节点发送的参数 子节点可能需要此参数进行计算
*/
private Map<String, String> args;
}

+ 72
- 1
common/src/main/java/work/xuye/common/dto/HttpRequestParams.java View File

@@ -1,13 +1,22 @@
package work.xuye.common.dto;

import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.annotation.TableField;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import work.xuye.common.enums.RequestMode;
import work.xuye.common.enums.SignatureEnumGroup;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
* @author xuye
@@ -24,10 +33,63 @@ public class HttpRequestParams {
private String url;
private Map<String, Object> body = Map.of();
private String charset;
private Map<String, String> headers = Map.of();
private Map<String, Object> headers = Map.of();
private Map<String, String> placeholderExpressions = Map.of();
private String mediaType = MediaType.APPLICATION_JSON_VALUE;

private RequestSignatureConfig signatureConfig;

/**
* 仅计算一次的表达式(同次请求后续使用上次计算值)
*/
private Set<String> onceCompileExpressions = Collections.emptySet();

/**
* 缓存的表达式值
*/
@TableField(exist = false)
private Map<String, Object> placeholderExpressionsValueCache = new HashMap<>();


/**
* 参数(由父任务传递,参与计算与组装)
*/
@TableField(exist = false)
private Map<String, String> args = new HashMap<>();


/**
* 根据占位符获取值
*/
public <T> T getPlaceholderValue(String placeholder, Class<T> t, SpelExpressionParser parser, EvaluationContext context) {
if (CollUtil.isEmpty(placeholderExpressions)) {
return null;
}
if (!placeholderExpressions.containsKey(placeholder)) {
return null;
}
boolean cache = false;

if (onceCompileExpressions.contains(placeholder)) {
// 一次请求中 仅计算一次
if (placeholderExpressionsValueCache.containsKey(placeholder)) {
return t.cast(placeholderExpressionsValueCache.get(placeholder));
}
cache = true;
}
context.setVariable("requestParams", this);
String expr = placeholderExpressions.get(placeholder);
T result = parser.parseExpression(expr).getValue(context, t);

if (cache) {
placeholderExpressionsValueCache.put(placeholder, result);
}

return result;


}

@Data
@NoArgsConstructor
@AllArgsConstructor
@@ -36,4 +98,13 @@ public class HttpRequestParams {

}

@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
public static class RequestSignatureConfig extends SignatureConfig {
private String signatureField;
private String signatureContext;
private SignatureEnumGroup.SignatureLocation signatureLocation;
}
}

+ 7
- 0
common/src/main/java/work/xuye/common/dto/HttpRes.java View File

@@ -1,5 +1,6 @@
package work.xuye.common.dto;

import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -25,6 +26,12 @@ public class HttpRes {
private Map<String, List<String>> headers = Map.of();
private Long costTimeMillis;

/**
* 临时变量
*/
@JsonInclude
private Object temporary;

public static HttpRes build() {
return new HttpRes();
}


+ 36
- 0
common/src/main/java/work/xuye/common/dto/SignatureConfig.java View File

@@ -0,0 +1,36 @@
package work.xuye.common.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* 加解密配置
*
* @author yechuan
* @since 2023/8/8 14:44
**/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SignatureConfig {
/**
* key 算法
*/
private String keyAlgorithm;

/**
* 公私钥类型
*/
private String keyType;

/**
* 加密算法
*/
private String signatureAlgorithm;

/**
* key
*/
private String key;
}

+ 18
- 0
common/src/main/java/work/xuye/common/dto/SignatureContext.java View File

@@ -0,0 +1,18 @@
package work.xuye.common.dto;

import lombok.Data;

import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.Signature;

/**
* @author yechuan
* @since 2023/8/9 17:53
**/
@Data
public class SignatureContext {
private PrivateKey priKey;
private PublicKey pubKey;
private Signature signature;
}

+ 36
- 0
common/src/main/java/work/xuye/common/enums/SignatureEnumGroup.java View File

@@ -0,0 +1,36 @@
package work.xuye.common.enums;

/**
* @author yechuan
* @since 2023/8/8 14:52
**/
public class SignatureEnumGroup {

/**
* 加密字符串的位置
*/
public static enum SignatureLocation{
header
}

public static enum KeyAlgorithm{
/**
* RSA
*/
RSA
}

public static enum KeyType{
/**
* 私钥
*/
privateKey,
/**
* 公钥
*/
publicKey,
}



}

+ 18
- 0
common/src/main/java/work/xuye/common/enums/UpdateStrategy.java View File

@@ -0,0 +1,18 @@
package work.xuye.common.enums;

/**
* 修改时的策略
*
* @author yechuan
* @since 2023/8/10 13:37
**/
public enum UpdateStrategy {
/**
* 忽略判断
*/
ignored,
/**
* 非NULL时加入sql
*/
not_null
}

+ 1
- 1
common/src/main/java/work/xuye/common/service/MessageService.java View File

@@ -58,7 +58,7 @@ public class MessageService {
.send();
messageMD5Store.save(result.toString());
} else {
log.debug("短期内已经发送过相同的异常信息,不再发送: [{}]", result);
log.info("短期内已经发送过相同的异常信息,不再发送: [{}]", result);
}
}
}

+ 17
- 0
common/src/main/java/work/xuye/common/service/UrlMD5Service.java View File

@@ -2,10 +2,16 @@ package work.xuye.common.service;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import work.xuye.common.constant.MessageConstants;
import work.xuye.common.enums.ResourceStatus;
import work.xuye.common.store.UrlMD5MapStore;

import java.util.Set;

/**
* @author xuye
* @since 2023/3/6 21:58
@@ -37,4 +43,15 @@ public class UrlMD5Service {
Long aLong = urlMD5MapStore.removeKey(seedUrl);
log.info("删除url对应的md5,url:{}, 删除条数:{}", seedUrl, aLong);
}

public void removeUrlCache(Message message) {
Set<String> parentOtherCacheKeySet = message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class);
if (!ObjectUtils.isEmpty(parentOtherCacheKeySet)) {
parentOtherCacheKeySet.forEach(this::removeUrlMD5);
}
String sendUrl = message.getHeaders().get(MessageConstants.SEED_URL, String.class);
if (StringUtils.hasText(sendUrl)) {
this.removeUrlMD5(sendUrl);
}
}
}

+ 23
- 0
common/src/main/java/work/xuye/common/spel/CustomFunction.java View File

@@ -4,6 +4,8 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.PathNotFoundException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.expression.Expression;
@@ -41,6 +43,13 @@ public class CustomFunction {

private final NsKVMapStore mapStore;

public String localDateTimeFormat(LocalDateTime time, String pattern) {
if (time == null) {
return "";
}
return time.format(DateTimeFormatter.ofPattern(pattern));
}

public String jsonExtract(String json, String path) {
String result = null;
try {
@@ -53,6 +62,20 @@ public class CustomFunction {
return result;
}

public String jsonExtractOrPathNotFound(String json, String path) {
String result = null;
try {
result = JsonPath.read(json, path).toString().trim();
} catch (PathNotFoundException e) {
log.warn("jsonPath not find, json: [{}], path: [{}]", json, path);
} catch (Exception e) {
log.error("jsonExtra handler error, json: [{}], path: [{}]", json, path);
throw new RuntimeException(e);
}
log.debug("jsonExtra handler, json: [{}], path: [{}], result: [{}]", json, path, result);
return result;
}

public String jsonExtractReplaceAll(String json, String jsonPath, String regex, String replacement) {
return this.jsonExtract(json, jsonPath).replaceAll(regex, replacement).trim();
}


+ 32
- 0
common/src/main/java/work/xuye/common/utils/signature/AbstractSignatureService.java View File

@@ -0,0 +1,32 @@
package work.xuye.common.utils.signature;

import work.xuye.common.dto.SignatureConfig;
import work.xuye.common.dto.SignatureContext;

import java.security.GeneralSecurityException;
import java.security.Signature;

/**
* @author yechuan
* @since 2023/8/8 14:42
**/
public abstract class AbstractSignatureService {

protected SignatureContext signatureContext;

/**
* 使用私钥加密
*
* @param content
* @return
* @throws GeneralSecurityException
*/
public abstract String signatureByPriKey(String content) throws GeneralSecurityException;


void init(SignatureConfig signatureConfig) throws GeneralSecurityException {
signatureContext = new SignatureContext();
signatureContext.setSignature(Signature.getInstance(signatureConfig.getSignatureAlgorithm()));
}

}

+ 70
- 0
common/src/main/java/work/xuye/common/utils/signature/RSASignatureService.java View File

@@ -0,0 +1,70 @@
package work.xuye.common.utils.signature;

import org.apache.commons.codec.binary.Base64;
import org.springframework.util.DigestUtils;
import work.xuye.common.dto.SignatureConfig;
import work.xuye.common.enums.SignatureEnumGroup;

import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.security.KeyFactory;
import java.security.Signature;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;

/**
* RSA
*
* @author yechuan
* @since 2023/8/8 14:47
**/

class RSASignatureService extends AbstractSignatureService {

private RSASignatureService() {

}

public static AbstractSignatureService getInstance(SignatureConfig signatureConfig) throws GeneralSecurityException {
RSASignatureService result = new RSASignatureService();
result.init(signatureConfig);
return result;
}

@Override
public String signatureByPriKey(String content) throws GeneralSecurityException {
if (signatureContext == null) {
throw new RuntimeException("未初始化公私钥对信息");
}
if (signatureContext.getSignature() == null) {
throw new RuntimeException("未初始化私钥信息");
}
Signature signature = signatureContext.getSignature();
content = DigestUtils.md5DigestAsHex(content.getBytes(StandardCharsets.UTF_8)).toUpperCase();
signature.update(content.getBytes(StandardCharsets.UTF_8));
return Base64.encodeBase64String(signature.sign());
}

@Override
void init(SignatureConfig signatureConfig) throws GeneralSecurityException {
super.init(signatureConfig);

String keyTypeName = signatureConfig.getKeyType();
SignatureEnumGroup.KeyType keyType = SignatureEnumGroup.KeyType.valueOf(keyTypeName);

KeyFactory keyFactory = KeyFactory.getInstance(SignatureEnumGroup.KeyAlgorithm.RSA.name());

byte[] encodedKey = Base64.decodeBase64(signatureConfig.getKey());

if (SignatureEnumGroup.KeyType.privateKey.equals(keyType)) {
signatureContext.setPriKey(keyFactory.generatePrivate(new PKCS8EncodedKeySpec(encodedKey)));
signatureContext.getSignature().initSign(signatureContext.getPriKey());
} else if (SignatureEnumGroup.KeyType.publicKey.equals(keyType)) {
signatureContext.setPubKey(keyFactory.generatePublic(new X509EncodedKeySpec(encodedKey)));
}


}


}

+ 33
- 0
common/src/main/java/work/xuye/common/utils/signature/SignatureFactory.java View File

@@ -0,0 +1,33 @@
package work.xuye.common.utils.signature;

import work.xuye.common.dto.SignatureConfig;
import work.xuye.common.enums.SignatureEnumGroup;

import java.security.GeneralSecurityException;

/**
* @author yechuan
* @since 2023/8/8 15:20
**/
public class SignatureFactory {

/**
* 获取加解密实现
*/
public static AbstractSignatureService getSignatureService(SignatureConfig signatureConfig) throws GeneralSecurityException {

AbstractSignatureService result = null;

switch (SignatureEnumGroup.KeyAlgorithm.valueOf(signatureConfig.getKeyAlgorithm())) {
case RSA:
result = RSASignatureService.getInstance(signatureConfig);
break;
}
if (result == null) {
throw new RuntimeException(String.format("不支持的签名方式,signatureConfig:%s", signatureConfig));
}
return result;

}

}

+ 7
- 1
common/src/main/resources/markdown/task.md View File

@@ -37,4 +37,10 @@
## 基金宝图文、视频RSS

1. 对方给的接口数据格式比较乱,让对方改了多次才勉强能用
2. 视频暂时只有一条数据,找媒体方沟通多次依旧只能提供一条
2. 视频暂时只有一条数据,找媒体方沟通多次依旧只能提供一条

## 第一财经视频RSS

1. 请求需要进行加密 新增加密相关配置
2. 列表接口存在上下架状态,对于下架的视频 无法请求详情接口,需要伪造详情数据以便执行后续流程
3. 接口请求存在顺序关系,新增配置,让task允许存在多个后继task且可配置制定发送topic

+ 180
- 0
helper/src/main/java/work/xuye/helper/checker/DataDiffChecker.java View File

@@ -0,0 +1,180 @@
package work.xuye.helper.checker;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import work.xuye.common.db.service.TaskManager;
import work.xuye.helper.properties.HelperProperties;
import work.xuye.helper.service.JdbcTemplateManager;

import java.util.*;

/**
* @author xuye
* @since 2023/7/13 14:57
**/
@Slf4j
@Service
@RequiredArgsConstructor
public class DataDiffChecker {

private final TaskManager taskManager;
private final JdbcTemplateManager jdbcTemplateManager;
private final HelperProperties helperProperties;


public Map<String, List<Object>> checkDifferentKeys(Map<String, Object> map1, Map<String, Object> map2) {
Map<String, List<Object>> map = new HashMap<>();
for (String key : map1.keySet()) {
if (map2.containsKey(key)) {
Object value1 = map1.get(key);
Object value2 = map2.get(key);

if (!ObjectUtils.nullSafeEquals(value1, value2)) {
if (!helperProperties.getEqualsIgnoreField().contains(key)) {
map.put(key, List.of(value1, value2));
log.debug("@@@ key: {}, value1: {}, value2: {}", key, value1, value2);
}
}
} else {
log.debug("@@@ key: {}, value1: {}, value2: {}", key, map1.get(key), null);
ArrayList<Object> list = new ArrayList<>();
list.add(map1.get(key));
list.add(null);
map.put(key, list);
}
}
for (String key : map2.keySet()) {
if (!map1.containsKey(key)) {
log.debug("@@@ key: {}, value1: {}, value2: {}", key, null, map2.get(key));
ArrayList<Object> list = new ArrayList<>();
list.add(null);
list.add(map2.get(key));
map.put(key, list);
}
}
return map;
}


public void check(List<Integer> ids, Integer intervalDays) {
List<Integer> newsIds = helperProperties.getTaskTypeIdMap().get(NewsTypeEnum.NEWS);
List<Integer> videoIds = helperProperties.getTaskTypeIdMap().get(NewsTypeEnum.VIDEO);
if (ObjectUtils.isEmpty(ids)) {
handleNews(newsIds, intervalDays);
// handleVideo(videoIds);
} else {
for (Integer id : ids) {
if (newsIds.contains(id)) {
handleNews(List.of(id), intervalDays);
} else if (videoIds.contains(id)) {
// handleVideo(List.of(id));
}
}
}
}

public void handleNews(List<Integer> ids, Integer intervalDays) {

String source = helperProperties.getSourceDbName();
String target = helperProperties.getTargetDbName();

String sql = "select url from all_news where spider = ? and createDate > CURDATE() - INTERVAL ? DAY order by createDate desc";
for (Integer id : ids) {
String taskName = taskManager.getTaskInfoByTaskId(id).getTask().getName();
log.info("-------------------------{}-------------------------", taskName);
List<String> sourceUrl = jdbcTemplateManager.getTemplate(source).queryForList(sql, String.class, taskName, intervalDays);
List<String> targetUrl = jdbcTemplateManager.getTemplate(target).queryForList(sql, String.class, taskName, intervalDays);

// 判断交集数据
List<String> intersection = new ArrayList<>(sourceUrl);
intersection.retainAll(targetUrl);
log.info("@@@ [{}] 记录数: [{}], [{}] 记录数: [{}], 交集条数: [{}]", source, sourceUrl.size(), target, targetUrl.size(), intersection.size());

// 以 source 为基准,判断差集数据
List<String> diff = findDifference(sourceUrl, intersection);
if (diff.size() > 0) {
log.info("@@@ [{}] 有,但是 [{}] 没有的记录数: {}", source, target, diff.size());
checkDiffReason(diff, jdbcTemplateManager.getTemplate(target));
}


// 以 target 为基准,判断差集数据
diff = findDifference(targetUrl, intersection);
if (diff.size() > 0) {
log.info("@@@ [{}] 有,但是 [{}] 没有的记录数: {}", target, source, diff.size());
checkDiffReason(diff, jdbcTemplateManager.getTemplate(source));
}


// 判断交集数据是否相同
findFieldsDiff(intersection, jdbcTemplateManager.getTemplate(source), jdbcTemplateManager.getTemplate(target));
}
}

private void findFieldsDiff(List<String> urls, JdbcTemplate jdbcTemplate, JdbcTemplate jdbcTemplate2) {
int count = 0;
Map<String, Map<String, List<Object>>> result = new HashMap<>();
String sql = "select * from all_news where url = ? limit 1";
for (String s : urls) {
if (urls.size() > helperProperties.getThreshold() && Math.random() > helperProperties.getSampleRate()) {
continue;
}
count++;
Map<String, Object> map = jdbcTemplate.queryForMap(sql, s);
Map<String, Object> map2 = jdbcTemplate2.queryForMap(sql, s);
Map<String, List<Object>> differentKeys = checkDifferentKeys(map, map2);
if (differentKeys.size() > 0) {
result.put(s, differentKeys);
}
}
if (ObjectUtils.isEmpty(result)) {
log.info("@@@ 在 [{}] 条中抽检了 [{}] 条,没有字段差异", urls.size(), count);
} else {

Set<String> keys = new HashSet<>();
for (Map<String, List<Object>> value : result.values()) {
keys.addAll(value.keySet());
}
log.warn("@@@ 共在 [{}] 条中抽检了 [{}] 条,有 [{}] 条有字段差异, 差异字段: {}", urls.size(), count, result.size(), keys);
}
}

public List<String> findDifference(List<String> source, List<String> intersection) {
List<String> diff = new ArrayList<>(source);
diff.removeAll(intersection);
return diff;
}

public void checkDiffReason(List<String> urls, JdbcTemplate jdbcTemplate) {
List<String> occupied = new ArrayList<>();
List<String> notExist = new ArrayList<>();

String sql = "select spider from all_news where url = ?";
for (String url : urls) {
List<String> spiders = jdbcTemplate.queryForList(sql, String.class, url);
if (spiders.size() == 1) {
occupied.add(url);
} else {
notExist.add(url);
}
}

if (!ObjectUtils.isEmpty(occupied)) {
log.info("@@ 已经被其他爬虫提前爬取的记录条数`: {}", occupied.size());
}
if (!ObjectUtils.isEmpty(notExist)) {
if (notExist.size() > 5) {
log.error("@@ ⚠️不存在的记录条数: {}", notExist.size());
} else {
log.warn("@@ 不存在的记录条数: {}, url: {}", notExist.size(), notExist);
}
}


}


}

+ 14
- 0
helper/src/main/java/work/xuye/helper/checker/NewsTypeEnum.java View File

@@ -0,0 +1,14 @@
package work.xuye.helper.checker;


/**
* @author xuye
* @since 2023/3/4
**/

public enum NewsTypeEnum {

NEWS,
VIDEO
}


+ 35
- 0
helper/src/main/java/work/xuye/helper/controller/CheckController.java View File

@@ -0,0 +1,35 @@
package work.xuye.helper.controller;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import work.xuye.helper.checker.DataDiffChecker;
import work.xuye.helper.vo.DataCheckVO;

import javax.validation.Valid;

/**
* @author xuye
* @since 2023/7/13 18:13
**/
@Api(tags = "数据对比")
@Validated
@RestController
@RequestMapping("/check")
@RequiredArgsConstructor
public class CheckController {

private final DataDiffChecker dataDiffChecker;

@PostMapping
@ApiOperation("指定任务对比")
public void check(@RequestBody @Valid DataCheckVO dataCheckVO) {
dataDiffChecker.check(dataCheckVO.getIds(), dataCheckVO.getIntervalDays());
}

}

+ 28
- 0
helper/src/main/java/work/xuye/helper/properties/DatasourceConfigProperties.java View File

@@ -0,0 +1,28 @@
package work.xuye.helper.properties;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* @author xuye
* @since 2023/1/14 21:50
**/
@Data
@Component
@ConfigurationProperties(prefix = DatasourceConfigProperties.PROPERTIES_PREFIX)
public class DatasourceConfigProperties {

public static final String PROPERTIES_PREFIX = "datasource";

private Map<String, DataSource> configMap;

@Data
public static class DataSource {
private String url;
private String username;
private String password;
}
}

+ 25
- 7
helper/src/main/java/work/xuye/helper/properties/HelperProperties.java View File

@@ -1,28 +1,46 @@
package work.xuye.helper.properties;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import work.xuye.helper.checker.NewsTypeEnum;

import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* @author xuye
* @since 2023/1/14 21:50
**/
@Slf4j
@Data
@Component
@ConfigurationProperties(prefix = HelperProperties.PROPERTIES_PREFIX)
public class HelperProperties {


public static final String PROPERTIES_PREFIX = "helper";
private String sourceDbName;
private String targetDbName;
private Map<NewsTypeEnum, List<Integer>> taskTypeIdMap;

/**
* 对比字段时,忽略的字段
*/
private Set<String> equalsIgnoreField;


/**
* 触发抽检的阈值
*/
private int threshold = 1000;

/**
* 抽检的比例
*/
private double sampleRate = 0.1;

private Map<String, DataSource> datasourceMap;

@Data
public static class DataSource {
private String url;
private String username;
private String password;
}
}

+ 44
- 0
helper/src/main/java/work/xuye/helper/service/JdbcTemplateManager.java View File

@@ -0,0 +1,44 @@
package work.xuye.helper.service;

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import work.xuye.helper.properties.DatasourceConfigProperties;

import javax.annotation.PostConstruct;
import java.util.HashMap;

/**
* @author xuye
* @since 2023/7/13 14:04
**/
@Slf4j
@Service
@RequiredArgsConstructor
public class JdbcTemplateManager {


private final DatasourceConfigProperties datasourceConfigProperties;

private final HashMap<String, JdbcTemplate> jdbcTemplateMap = new HashMap<>();

public JdbcTemplate getTemplate(String key) {
return jdbcTemplateMap.get(key);
}

@PostConstruct
private void initJdbcTemplate() {
jdbcTemplateMap.clear();
datasourceConfigProperties.getConfigMap().forEach((key, source) -> {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(source.getUrl());
config.setUsername(source.getUsername());
config.setPassword(source.getPassword());
HikariDataSource dataSource = new HikariDataSource(config);
jdbcTemplateMap.put(key, new JdbcTemplate(dataSource));
});
}
}

+ 0
- 171
helper/src/main/java/work/xuye/helper/service/UpdateService.java View File

@@ -1,171 +0,0 @@
package work.xuye.helper.service;

import cn.hutool.db.Db;
import cn.hutool.db.Entity;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import work.xuye.helper.properties.HelperProperties;

import javax.annotation.PostConstruct;
import java.sql.SQLException;
import java.util.*;

/**
* @author xuye
* @since 2023/7/12 14:55
**/
@Slf4j
@Service
@RequiredArgsConstructor
public class UpdateService implements ApplicationRunner {

private final HelperProperties helperProperties;

private final HashMap<String, HikariDataSource> dataSourceMap;

@PostConstruct
public void init() {

Map<String, HelperProperties.DataSource> datasourceConfigMap = helperProperties.getDatasourceMap();
for (String key : datasourceConfigMap.keySet()) {
HelperProperties.DataSource source = datasourceConfigMap.get(key);
HikariConfig config = new HikariConfig();
config.setJdbcUrl(source.getUrl());
config.setUsername(source.getUsername());
config.setPassword(source.getPassword());
HikariDataSource dataSource = new HikariDataSource(config);
dataSourceMap.put(key, dataSource);


}
}

public void start() throws SQLException {
String sql = "select date(createDate) as date, spider, count(*) as count\n" +
"from all_news\n" +
"where JSON_EXTRACT(raw_data, '$.crawler') = 'snp'\n" +
" and createDate > curdate() - interval 1 day\n" +
"group by spider, date\n" +
"order by date desc, count desc;";


Set<String> keys = dataSourceMap.keySet();

HashMap<String, List<Entity>> map = new HashMap<>();

for (String key : keys) {
HikariDataSource dataSource = dataSourceMap.get(key);
List<Entity> query = Db.use(dataSourceMap.get(key)).query(sql);
map.put(key, query);
}

// Compare List<Entity> between different environments
for (String key1 : keys) {
for (String key2 : keys) {
if (!key1.equals(key2)) {
List<Entity> list1 = map.get(key1);
List<Entity> list2 = map.get(key2);

for (Entity entity1 : list1) {
for (Entity entity2 : list2) {
String date1 = entity1.getStr("date");
String spider1 = entity1.getStr("spider");
int count1 = entity1.getInt("count");

String date2 = entity2.getStr("date");
String spider2 = entity2.getStr("spider");
int count2 = entity2.getInt("count");

if (date1.equals(date2) && spider1.equals(spider2) && count1 != count2) {
int diff = Math.abs(count1 - count2);
String more = count1 > count2 ? key1 : key2;
System.out.printf("@@@ [%s] [%s] %s: %d %s: %d, diff abs: %d, more: %s%n", date1, spider1, key1, count1, key2, count2, diff, more);
this.findDIff(spider1, date1);
}
}
}
}
}
// todo 此处比较多个的话有遗漏,后续优化下
break;
}
}


public void findDIff(String spider, String date) throws SQLException {
HashMap<String, List<String>> map = new HashMap<>();
for (String key : dataSourceMap.keySet()) {
HikariDataSource dataSource = dataSourceMap.get(key);

String sql = "SELECT url FROM all_news WHERE spider = '" + spider + "' AND date(createDate) = '" + date + "';";
List<Entity> resultList = Db.use(dataSource).query(sql);
ArrayList<String> list = new ArrayList<>();
for (Entity entity : resultList) {
String url = entity.getStr("url");
list.add(url);
}
map.put(key, list);
}

for (String s : map.keySet()) {
this.compareEnvironments(map, s);
}


}

public void compareEnvironments(HashMap<String, List<String>> map, String environment) throws SQLException {

List<String> environmentList = map.get(environment);
if (environmentList == null) {
return;
}
for (String key : map.keySet()) {
if (key.equals(environment)) {
continue;
}
List<String> list = map.getOrDefault(key, new ArrayList<>());
List<String> intersection = new ArrayList<>(list);
intersection.retainAll(environmentList);

List<String> different = new ArrayList<>(list);
different.removeAll(intersection);

if (!different.isEmpty()) {
System.out.println("@@ 环境 " + environment + " 中,List " + key + " 中独有的元素有" + different.size() + "个");
for (String s : different) {
getStatusByUrl(s);
}
}
}
}

public void getStatusByUrl(String url) throws SQLException {
String sql = "select spider from all_news where url='" + url + "';";
for (String key : dataSourceMap.keySet()) {
HikariDataSource dataSource = dataSourceMap.get(key);
List<Entity> resultList = Db.use(dataSource).query(sql);
if (ObjectUtils.isEmpty(resultList)) {
System.err.println("⚠️ url: " + url + " not found in " + key);
}
if (resultList.size() == 1) {
System.out.println("url [" + url + "] 被 [" + resultList.get(0).get("spider") + "] 提前抓取了");
}
if (resultList.size() > 1) {
System.err.println("!!! url: " + url + " found in " + key + " " + resultList.size() + " times");
}

}
}

@Override
public void run(ApplicationArguments args) throws Exception {
this.start();
}
}

+ 24
- 0
helper/src/main/java/work/xuye/helper/vo/DataCheckVO.java View File

@@ -0,0 +1,24 @@
package work.xuye.helper.vo;

import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

/**
* @author xuye
* @since 2023/7/14 18:41
**/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class DataCheckVO {

@ApiModelProperty("任务id")
private List<Integer> ids;

@ApiModelProperty("最近几天的数据")
private Integer intervalDays;
}

+ 39
- 6
helper/src/main/resources/application.yml View File

@@ -1,15 +1,48 @@
knife4j:
enable: true
server:
port: 9300
helper:
datasourceMap:
sit:
port: 9301
datasource:
configMap:
news-sit:
url: jdbc:mysql://47.116.58.10:3306/pyspider_sit_resultdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8
username: pyspider_user
password: ND0328qSywfre
prd:
news-prd:
url: jdbc:mysql://47.103.55.230:3306/pyspider_resultdb?characterEncoding=utf8&autoReconnect=true&useUnicode=true&useSSL=false
username: pyspider
password: strzsJQWp%uw9oKB

video-prd:
url: jdbc:mysql://47.116.61.180:3306/fhl_data_ingestion?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8
username: fhl_owner
password: 5rDjEXN%bAq5Bf#9
sq:
sentry:
enabled: false
helper:
equals-ignore-field:
- id
- createDate
- postDate
- update_time
task-type-id-map:
news:
- 1
- 2
- 3
- 4
- 5
- 7
- 8
- 9
- 14
- 15
- 16
- 17
video:
- 6
- 10
- 11
- 12
source-db-name: news-sit
target-db-name: news-prd

+ 3
- 1
scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java View File

@@ -1,5 +1,6 @@
package work.xuye.scheduler.service;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
@@ -73,7 +74,8 @@ public class IssueService implements ApplicationRunner {

public ArrayList<TaskVO> issueAllTask() {
log.info("--------------------");
List<Task> list = taskService.list();
// List<Task> list = taskService.list();
List<Task> list = taskService.list(new LambdaQueryWrapper<Task>().eq(Task::getIsRoot,true));
Collections.shuffle(list);
ArrayList<TaskVO> result = new ArrayList<>();
list.forEach(task -> {


+ 9
- 8
sink/src/main/java/work/xuye/sink/handler/ItemHandler.java View File

@@ -10,6 +10,7 @@ import org.springframework.util.DigestUtils;
import org.springframework.util.ObjectUtils;
import work.xuye.common.constant.MessageConstants;
import work.xuye.common.constant.StageConstants;
import work.xuye.common.db.entity.Mapping;
import work.xuye.common.db.entity.Task;
import work.xuye.common.db.entity.vo.SinkParams;
import work.xuye.common.db.entity.vo.TableTemplate;
@@ -22,10 +23,7 @@ import work.xuye.common.spel.SpringExpressionLanguageEvaluator;
import work.xuye.common.utils.DebugUtil;
import work.xuye.sink.service.JdbcService;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

/**
* @author xuye
@@ -64,8 +62,7 @@ public class ItemHandler {
}

private void cleanCache(Message<String> message) {
String seedUrl = (String) message.getHeaders().get(MessageConstants.SEED_URL);
urlMD5Service.removeUrlMD5(seedUrl);
urlMD5Service.removeUrlCache(message);
}


@@ -87,7 +84,9 @@ public class ItemHandler {

SinkParams.CheckDelete checkDeleteConfig = taskVO.getTask().getSinkParams().getCheckDelete();
// 根据消息体,和映射关系,使用SpEL表达式,得到计算结果
HashMap<String, Object> expressionResultMap = evaluator.evaluate(taskVO.getTemplateMappingMap().get(templateName).getTableMapping(), message);
Map<String, Mapping> templateMappingMap = taskVO.getTemplateMappingMap();
Mapping mapping = templateMappingMap.get(templateName);
HashMap<String, Object> expressionResultMap = evaluator.evaluate(mapping.getTableMapping(), message);
// 如果不能通过校验,则抛出异常
this.validate(expressionResultMap, template);
// 将计算结果合并到模板中,此刻的模板是拥有值的
@@ -319,8 +318,10 @@ public class ItemHandler {
if (!ObjectUtils.isEmpty(checkUpdateConfig.getJsonPath())) {
dbValue = cf.jsonExtract(dbValue.toString(), checkUpdateConfig.getJsonPath());
nowValue = cf.jsonExtract(nowValue.toString(), checkUpdateConfig.getJsonPath());
} else if (!ObjectUtils.isEmpty(checkUpdateConfig.getDbObjectExpression())) {
dbValue = evaluator.evaluate(dbValue, checkUpdateConfig.getDbObjectExpression(), String.class);
} else {
throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是没有配置jsonPath");
throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是没有配置jsonPath与dbObjectExpression");
}
if (dbValue.equals(nowValue)) {
log.info("[{}][✅ 已是最新][{} {}] " + "{}: {}",


+ 6
- 1
sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java View File

@@ -2,6 +2,7 @@ package work.xuye.sink.handler;

import work.xuye.common.db.entity.vo.TableTemplate;
import work.xuye.common.enums.DbFieldType;
import work.xuye.common.enums.UpdateStrategy;

import java.util.ArrayList;
import java.util.List;
@@ -37,13 +38,17 @@ public class SqlGenerator {
boolean isFirstField = true;

for (TableTemplate.Field field : fields) {
String fieldValue = formatFieldValue(field);
if ("NULL".equals(fieldValue) && UpdateStrategy.not_null.equals(field.getUpdateStrategy())) {
continue;
}
if (!isFirstField) {
sqlBuilder.append(", ");
} else {
isFirstField = false;
}
sqlBuilder.append('`').append(field.getFieldName()).append('`')
.append('=').append(formatFieldValue(field));
.append('=').append(fieldValue);
}

TableTemplate.Field updateTimeField = template.getUpdateTimeField();


+ 233
- 33
source/src/main/java/work/xuye/source/handler/SourceHandler.java View File

@@ -1,8 +1,10 @@
package work.xuye.source.handler;

import com.google.gson.Gson;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
@@ -13,6 +15,7 @@ import org.springframework.util.DigestUtils;
import org.springframework.util.ObjectUtils;
import work.xuye.common.constant.*;
import work.xuye.common.db.entity.Task;
import work.xuye.common.db.entity.vo.Dispatcher;
import work.xuye.common.db.entity.vo.TaskMiniVO;
import work.xuye.common.db.service.TaskManager;
import work.xuye.common.dto.HttpRequestParams;
@@ -20,17 +23,21 @@ import work.xuye.common.dto.HttpRes;
import work.xuye.common.enums.ProcessMode;
import work.xuye.common.enums.RequestMode;
import work.xuye.common.enums.ResourceStatus;
import work.xuye.common.enums.SignatureEnumGroup;
import work.xuye.common.service.UrlMD5Service;
import work.xuye.common.utils.DebugUtil;
import work.xuye.common.utils.ID;
import work.xuye.common.utils.JsonPathUtil;
import work.xuye.common.utils.signature.AbstractSignatureService;
import work.xuye.common.utils.signature.SignatureFactory;
import work.xuye.source.request.RequestClient;
import work.xuye.source.spel.CustomRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.stream.Stream;

/**
* @author xuye
@@ -47,6 +54,7 @@ public class SourceHandler {
private final UrlMD5Service urlMD5Service;
private final StreamBridge streamBridge;
private final TaskManager taskManager;
public final Gson gson;

public List<Message<HttpRes>> handle(Message<TaskMiniVO> message) {
TaskMiniVO taskMiniVO = message.getPayload();
@@ -58,18 +66,33 @@ public class SourceHandler {
return null;
}
HttpRequestParams req = task.getRequestParams();
if (!ObjectUtils.isEmpty(taskMiniVO.getArgs())) {
req.getArgs().putAll(taskMiniVO.getArgs());
}

Set<String> otherCacheKeySet = new HashSet<>();
Set parentOtherCacheKeySet = message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class);
if (parentOtherCacheKeySet != null) {
otherCacheKeySet.addAll(parentOtherCacheKeySet);
}


String taskName = task.getName();
HttpRes res = null;
List<HttpRes> result = null;
if (RequestMode.normal.equals(req.getMode())) {
res = this.request(req);
result = this.handleRes(task, res);
} else if (RequestMode.SpEL.equals(req.getMode())) {
try {
if (RequestMode.normal.equals(req.getMode())) {
res = this.request(req);
result = this.handleRes(task, res);
} else if (RequestMode.SpEL.equals(req.getMode())) {

StandardEvaluationContext context = new StandardEvaluationContext();
Expression expression = parser.parseExpression(task.getRequestParams().getSpELConfig().getExpression());
List<HttpRes> value = expression.getValue(context, this, List.class);
result = this.handleRes(task, value);
StandardEvaluationContext context = new StandardEvaluationContext();
Expression expression = parser.parseExpression(task.getRequestParams().getSpELConfig().getExpression());
List<HttpRes> value = expression.getValue(context, this, List.class);
result = this.handleRes(task, value);
}
} catch (Exception e) {
urlMD5Service.removeUrlCache(message);
}
if (ObjectUtils.isEmpty(result)) {
return null;
@@ -81,11 +104,12 @@ public class SourceHandler {
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE_WATCH)
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID))
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate())
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID))
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME))
.setHeader(MessageConstants.TASK_ID, task.getId())
.setHeader(MessageConstants.TASK_NAME, task.getName())
.setHeader(SnapshotConstants.URL, req.getUrl())
.setHeader(SnapshotConstants.RESOURCE_STATUS, r.getResourceStatus())
.setHeader(SnapshotConstants.STATUS, r.getStatus())
.setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet)
.build();

boolean send = streamBridge.send(taskName + BindingConstants.SNAPSHOT_OUT_POSTFIX, watchMessage);
@@ -96,6 +120,28 @@ public class SourceHandler {
return null;
}

if (Objects.nonNull(task.getDispatcher())) {

Map<String, Message> nextNodeMessage = buildNextNodeMessage(result, task, message);


nextNodeMessage.forEach((k, v) -> {
if (!ObjectUtils.isEmpty(otherCacheKeySet)) {
Set set = Objects.requireNonNullElse(v.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET, Set.class), new HashSet());
set.addAll(otherCacheKeySet);
v = MessageBuilder
.fromMessage(v)
.setHeader(MessageConstants.OTHER_CACHE_KEY_SET, set)
.build();
}
boolean send = streamBridge.send(k, v);
if (!send) {
throw new RuntimeException("send message failed");
}
});
return null;
}

ArrayList<Message<HttpRes>> messageList = new ArrayList<>();
result.forEach(
item -> {
@@ -104,9 +150,10 @@ public class SourceHandler {
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE)
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID))
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate())
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID))
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME))
.setHeader(MessageConstants.TASK_ID, task.getId())
.setHeader(MessageConstants.TASK_NAME, task.getName())
.setHeader(MessageConstants.SEED_URL, item.getRequestParams().getUrl())
.setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet)
.build());
}
);
@@ -115,6 +162,93 @@ public class SourceHandler {

}

private Map<String, Message> buildNextNodeMessage(List<HttpRes> result, Task task, Message<TaskMiniVO> message) {
Dispatcher filter = task.getDispatcher();

Map<String, Message> resultMap = new HashMap<>();
result.stream()
.map(item -> {
HttpRes httpRes = HttpRes.build()
.status(item.getStatus())
.costTimeMillis(item.getCostTimeMillis())
.headers(item.getHeaders())
.requestParams(item.getRequestParams());
httpRes.setResourceStatus(item.getResourceStatus());
httpRes.setTemporary(JsonPathUtil.read(item.getBody(), task.getDispatcher().getFilterArgs()));
return httpRes;
})
.flatMap(item -> {
if (item.getTemporary() instanceof Collection) {
return ((Collection<?>) item.getTemporary())
.stream()
.map(temporary -> {
HttpRes params = HttpRes.build()
.status(item.getStatus())
.costTimeMillis(item.getCostTimeMillis())
.headers(item.getHeaders())
.requestParams(item.getRequestParams());
params.setResourceStatus(item.getResourceStatus());
params.setTemporary(temporary);
return params;
});
} else {
return Stream.of(item);
}
})
.forEach(httpRes -> {
Object temporary = httpRes.getTemporary();
httpRes.setTemporary(null);
Map map = (Map) temporary;
// 是否更新判断
String uniqueKey = map.get(filter.getUniqueKey()).toString();
uniqueKey = httpRes.getRequestParams().getUrl() + "?uniqueKey=" + uniqueKey;
String md5Digest = DigestUtils.md5DigestAsHex(gson.toJson(map).getBytes(StandardCharsets.UTF_8));
ResourceStatus resourceStatus = urlMD5Service.getResourceStatus(uniqueKey, md5Digest);
urlMD5Service.put(uniqueKey, md5Digest);
if (ResourceStatus.UNCHANGED.equals(resourceStatus)) {
return;
}

// 节点命中
List<Dispatcher.NextNodeConfig> nextNodes = filter.getNextNodes();
if (ObjectUtils.isEmpty(nextNodes)) {
return;
}
StandardEvaluationContext context = new StandardEvaluationContext(this);
context.setVariable("filterArgs", map);
context.setVariable("httpRes", httpRes);

for (Dispatcher.NextNodeConfig nextNode : nextNodes) {

if (!Boolean.TRUE.equals(getPlaceholderSpel(nextNode.getCondition(), filter.getPlaceholderExpressions(), context, Boolean.class))) {
continue;
}
Set<String> otherCacheKeySet = new HashSet<>();
otherCacheKeySet.add(uniqueKey);
// 命中
String topic = getPlaceholderSpel(nextNode.getTopic(), filter.getPlaceholderExpressions(), context, String.class);
Object payload = getPlaceholderSpel(nextNode.getPayload(), filter.getPlaceholderExpressions(), context, Object.class);
Message<Object> nextTaskMessage = MessageBuilder
.withPayload(payload)
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE)
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID))
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate())
.setHeader(MessageConstants.TASK_ID, task.getId())
.setHeader(MessageConstants.TASK_NAME, task.getName())
.setHeader(MessageConstants.SEED_URL, httpRes.getRequestParams().getUrl())
.setHeader(MessageConstants.OTHER_CACHE_KEY_SET, otherCacheKeySet)
.build();
resultMap.put(topic, nextTaskMessage);
if ("break".equals(filter.getMode())) {
break;
}
}
}
);

return resultMap;

}

private void setResourceStatus(HttpRes res) {
String md5Digest = DigestUtils.md5DigestAsHex(res.getBody().getBytes());
@@ -186,25 +320,87 @@ public class SourceHandler {
public HttpRes request(HttpRequestParams req) {
this.preHandleUrl(req);
this.preHandleBody(req);
this.preHandleHeader(req);
this.preHandleSignature(req);
return requestClient.execute(req);
}

private void preHandleBody(HttpRequestParams request) {
Map<String, Object> body = request.getBody();
if (ObjectUtils.isEmpty(body)) {

private void preHandleSignature(HttpRequestParams requestParams) {
HttpRequestParams.RequestSignatureConfig signatureConfig = requestParams.getSignatureConfig();
if (ObjectUtils.isEmpty(signatureConfig)) {
return;
}
for (String key : body.keySet()) {
Object value = body.get(key);
Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(value.toString());

String signatureContext = requestParams.getPlaceholderValue(signatureConfig.getSignatureContext(), String.class, parser, new StandardEvaluationContext(this));
AbstractSignatureService signatureService = null;
try {
signatureService = SignatureFactory.getSignatureService(signatureConfig);
String signature = signatureService.signatureByPriKey(signatureContext);
// 优化
if (SignatureEnumGroup.SignatureLocation.header.equals(signatureConfig.getSignatureLocation())) {
requestParams.getHeaders().put(signatureConfig.getSignatureField(), signature);
}
} catch (GeneralSecurityException e) {
throw new RuntimeException(e);
}
}

private void preHandleHeader(HttpRequestParams request) {
Map<String, Object> headers = request.getHeaders();
if (ObjectUtils.isEmpty(headers)) {
return;
}
request.setHeaders(calculationPlaceholderMap(request.getHeaders(), request));
}


/**
* 根据占位符从spelMap中获取对应的spel表达式并计算结果
* 或直接返回占位符(不满足占位符规则时,即无需计算)
*/
private <T> T getPlaceholderSpel(Object placeholder, Map<String, String> spelMap, EvaluationContext context, Class<T> resultClazz) {
Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(placeholder.toString());
if (matcher.find()) {
return parser.parseExpression(spelMap.get(matcher.group())).getValue(context, resultClazz);
}
return (T) placeholder;
}

/**
* 计算占位符的值,并返回此map
*/
private Map<String, Object> calculationPlaceholderMap(Map<String, Object> placeholderMap, HttpRequestParams request) {

for (String key : placeholderMap.keySet()) {
Object placeholderValue = placeholderMap.get(key);
Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(placeholderValue.toString());
if (matcher.find()) {
String placeholder = matcher.group();
String expr = request.getPlaceholderExpressions().get(placeholder);
Object result = parser.parseExpression(expr).getValue();
body.put(key, result);
Object value = request.getPlaceholderValue(placeholder, Object.class, parser, new StandardEvaluationContext(this));
placeholderMap.put(key, value);
}
}
request.setBody(body);
return placeholderMap;
}

private void preHandleBody(HttpRequestParams request) {
Map<String, Object> body = request.getBody();
if (ObjectUtils.isEmpty(body)) {
return;
}
request.setBody(calculationPlaceholderMap(request.getBody(), request));
// for (String key : body.keySet()) {
// Object value = body.get(key);
// Matcher matcher = RegexConstants.PLACEHOLDER_PATTERN.matcher(value.toString());
// if (matcher.find()) {
// String placeholder = matcher.group();
// String expr = request.getPlaceholderExpressions().get(placeholder);
// Object result = parser.parseExpression(expr).getValue();
// body.put(key, result);
// }
// }
// request.setBody(body);
}

private void preHandleUrl(HttpRequestParams request) {
@@ -213,19 +409,23 @@ public class SourceHandler {
Map<String, Object> patternMap = new HashMap<>();
while (matcher.find()) {
String placeholder = matcher.group();
patternMap.put(placeholder, null);
patternMap.put(placeholder, placeholder);
}

if (ObjectUtils.isEmpty(patternMap)) {
return;
}
Map<String, String> placeholderExpressions = request.getPlaceholderExpressions();

for (String key : patternMap.keySet()) {
String expr = placeholderExpressions.get(key);
Object value = parser.parseExpression(expr).getValue();
patternMap.put(key, value);
}

calculationPlaceholderMap(patternMap, request);
// Map<String, String> placeholderExpressions = request.getPlaceholderExpressions();


// for (String key : patternMap.keySet()) {
// String expr = placeholderExpressions.get(key);
// Object value = parser.parseExpression(expr).getValue();
// patternMap.put(key, value);
// }
for (String key : patternMap.keySet()) {
url = url.replace(key, patternMap.get(key).toString());
}


+ 3
- 1
source/src/main/java/work/xuye/source/properties/SourceProperties.java View File

@@ -19,7 +19,9 @@ public class SourceProperties {

public static final String PROPERTIES_PREFIX = "request";
private Pool pool;
private Integer timeoutSeconds = 30;
private Integer connectTimeoutSeconds = 20;
private Integer readTimeoutSeconds = 20;


@Data
@NoArgsConstructor


+ 13
- 5
source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java View File

@@ -38,15 +38,18 @@ public class OkHttpRequestClient implements RequestClient {
this.gson = gson;
SourceProperties.Pool poolConfig = sourceProperties.getPool();
this.client = new OkHttpClient.Builder()
.connectTimeout(sourceProperties.getTimeoutSeconds(), TimeUnit.SECONDS)
.connectTimeout(sourceProperties.getConnectTimeoutSeconds(), TimeUnit.SECONDS)
.readTimeout(sourceProperties.getReadTimeoutSeconds(), TimeUnit.SECONDS)
.connectionPool(new ConnectionPool(
poolConfig.getMaxIdleConnections(),
poolConfig.getKeepAliveSeconds(),
TimeUnit.SECONDS))
.followRedirects(true)
.build();
log.info("init OkHttpRequestClient success, timeoutSeconds: {}, maxIdleConnections: {}, keepAliveSeconds: {}",
sourceProperties.getTimeoutSeconds(),

log.info("init OkHttpRequestClient success, connectTimeoutSeconds: {}, readTimeoutSeconds: {}, maxIdleConnections: {}, keepAliveSeconds: {}",
sourceProperties.getConnectTimeoutSeconds(),
sourceProperties.getReadTimeoutSeconds(),
poolConfig.getMaxIdleConnections(),
poolConfig.getKeepAliveSeconds());
}
@@ -62,7 +65,10 @@ public class OkHttpRequestClient implements RequestClient {

// request header
if (!ObjectUtils.isEmpty(request.getHeaders())) {
requestBuilder.headers(Headers.Companion.of(request.getHeaders()));
request.getHeaders()
.forEach((k, v) -> requestBuilder.addHeader(k, v.toString()));

// requestBuilder.headers(Headers.Companion.of());
}

// request body
@@ -98,8 +104,10 @@ public class OkHttpRequestClient implements RequestClient {
// res
HttpRes httpRes = new HttpRes();
Request httpRequest = requestBuilder.build();
httpRes.requestParams(request);
try (Response response = client.newCall(httpRequest).execute()) {
httpRes.requestParams(request).headers(response.headers().toMultimap()).status(HttpStatus.valueOf(response.code()));
httpRes.headers(response.headers().toMultimap())
.status(HttpStatus.valueOf(response.code()));
if (!ObjectUtils.isEmpty(response.body())) {
BufferedSource source = response.body().source();
Charset charset = CharsetUtil.getCharsetByName(request.getCharset());


+ 3
- 1
transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java View File

@@ -51,7 +51,7 @@ public class TransformHandler {
try {
return this.doHandle(message);
} catch (Exception e) {
urlMD5Service.removeUrlMD5(this.getSeedUrl(message));
urlMD5Service.removeUrlCache(message);
throw new RuntimeException(e);
}
}
@@ -158,6 +158,7 @@ public class TransformHandler {
.setHeader(MessageConstants.SOURCE_TRACE_ID, message.getHeaders().get(MessageConstants.SOURCE_TRACE_ID))
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.TRANSFORMER)
.setHeader(MessageConstants.TRANSFORMER_TRACE_ID, ID.generate())
.setHeader(MessageConstants.OTHER_CACHE_KEY_SET, message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET))
.build();
boolean send = streamBridge.send(BindingConstants.TRANSFORMER_KEYS_OUT, keysMsg);
if (!send) {
@@ -192,6 +193,7 @@ public class TransformHandler {
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID))
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME))
.setHeader(MessageConstants.SEED_URL, message.getHeaders().get(MessageConstants.SEED_URL))
.setHeader(MessageConstants.OTHER_CACHE_KEY_SET, message.getHeaders().get(MessageConstants.OTHER_CACHE_KEY_SET))

.build();
results.add(itemMessage);


+ 26
- 0
transformer/src/main/java/work/xuye/transformer/transformer/JsonDataTransformer.java View File

@@ -0,0 +1,26 @@
package work.xuye.transformer.transformer;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
* @author yechuan
* @since 2023/8/11 11:55
**/
@Slf4j
@Component("jsonData")
public class JsonDataTransformer implements MessageTransformer{
@Override
public String transform(String message, String seedUrl) {
JsonObject res = JsonParser.parseString(message).getAsJsonObject();
boolean hasData = res.has("data");
if (hasData) {
return res.get("data").toString();
} else {
log.warn("resData transform failed, res not has data, res: {}", res);
}
return null;
}
}

+ 2
- 2
transformer/src/main/java/work/xuye/transformer/transformer/XmlToJsonTransformer.java View File

@@ -14,7 +14,7 @@ import org.springframework.util.ObjectUtils;
@Slf4j
@Component("xml2json")
public class XmlToJsonTransformer implements MessageTransformer {
@Override
public String transform(String xml, String seedUrl) {
if (ObjectUtils.isEmpty(xml)) {
@@ -24,7 +24,7 @@ public class XmlToJsonTransformer implements MessageTransformer {
try {
jsonObject = XML.toJSONObject(xml);
} catch (JSONException e) {
log.info("xml2json transform error, xml:{}, seedUrl:{}, exception:{}", xml, seedUrl, e);
log.warn("xml2json transform error,xml:{},seedUrl:{},error:{}", xml, seedUrl, e.getMessage());
throw new RuntimeException("xml2json transform error, seedUrl:" + seedUrl);
}
return jsonObject.toString();


Loading…
Cancel
Save