Kaynağa Gözat

保存、删除断言

master
xuye 1 yıl önce
ebeveyn
işleme
b8f93937f2
24 değiştirilmiş dosya ile 460 ekleme ve 88 silme
  1. +5
    -0
      README.md
  2. +8
    -1
      common/pom.xml
  3. +1
    -2
      common/src/main/java/work/xuye/common/db/entity/Task.java
  4. +21
    -1
      common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java
  5. +8
    -0
      common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java
  6. +7
    -2
      common/src/main/java/work/xuye/common/enums/DeleteCheckMode.java
  7. +17
    -1
      common/src/main/java/work/xuye/common/enums/ProcessMode.java
  8. +5
    -0
      common/src/main/java/work/xuye/common/enums/ProcessReason.java
  9. +62
    -0
      common/src/main/java/work/xuye/common/spel/CustomFunction.java
  10. +1
    -2
      common/src/main/java/work/xuye/common/spel/SpringExpressionLanguageEvaluator.java
  11. +27
    -0
      common/src/main/java/work/xuye/common/utils/DebugUtil.java
  12. +12
    -1
      common/src/main/resources/markdown/analizer.md
  13. +26
    -0
      common/src/main/resources/markdown/deploy.md
  14. +0
    -4
      common/src/main/resources/markdown/readme.md
  15. +34
    -16
      common/src/main/resources/markdown/sql.md
  16. +4
    -0
      common/src/main/resources/markdown/task.md
  17. +2
    -2
      pom.xml
  18. +5
    -1
      sink/src/main/java/work/xuye/sink/handler/DisappearHandler.java
  19. +114
    -23
      sink/src/main/java/work/xuye/sink/handler/ItemHandler.java
  20. +34
    -4
      sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java
  21. +17
    -2
      sink/src/main/java/work/xuye/sink/service/JdbcService.java
  22. +18
    -4
      source/src/main/java/work/xuye/source/handler/SourceHandler.java
  23. +23
    -19
      transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java
  24. +9
    -3
      transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java

+ 5
- 0
README.md Dosyayı Görüntüle

@@ -0,0 +1,5 @@
## [接口文档](https://snp-scheduler.deepq.tech/)

## [任务描述](https://snp-scheduler.deepq.tech/doc.html#/default-mdtagaf78a-omd/document-dc235ffa1acda5f62eb586372d3bb7e5)

## [当日爬取可视化(生产环境)](https://snp-scheduler.deepq.tech/doc.html#/default-mdtagaf78a-omd/document-d7656a65244e1fa48671f8f71422550e)

+ 8
- 1
common/pom.xml Dosyayı Görüntüle

@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>work.xuye</groupId>
@@ -109,6 +110,12 @@
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>


</dependencies>


+ 1
- 2
common/src/main/java/work/xuye/common/db/entity/Task.java Dosyayı Görüntüle

@@ -44,8 +44,7 @@ public class Task implements Serializable {
private String description;

@TableField("process_mode")
private ProcessMode processMode;

private ProcessMode processMode = ProcessMode.NORMAL;

@TableField(value = "request_params", typeHandler = JacksonTypeHandler.class)
private HttpRequestParams requestParams;


+ 21
- 1
common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java Dosyayı Görüntüle

@@ -6,6 +6,8 @@ import lombok.NoArgsConstructor;
import work.xuye.common.enums.DeleteCheckMode;
import work.xuye.common.enums.Status;

import java.util.List;

/**
* @author xuye
* @since 2023/2/17 23:41
@@ -37,10 +39,16 @@ public class SinkParams {
*/
private String tableName;

private InsertConfig insertConfig;

@Data
public static class InsertConfig {
private List<String> predicates;
}

@Data
public static class CheckUpdate {
private Status status;

private String fieldName;
private String jsonPath;
}
@@ -50,8 +58,20 @@ public class SinkParams {
private Status status;
private DeleteCheckMode mode;
private DisappearConfig disappearConfig;
private FixedValueConfig fixedValueConfig;
}

@Data
public static class FixedValueConfig {

/**
* 满足断言条件的数据才会被处理
*/
private List<String> predicates;

}


@Data
public static class DisappearConfig {
/**


+ 8
- 0
common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java Dosyayı Görüntüle

@@ -42,6 +42,11 @@ public class TableTemplate {
private Field updateTimeField;

/**
* 如果发生 更新/删除 操作,需要更新该字段的值
*/
private Field updateField;

/**
* 其他字段,指的是普通字段
*/
private List<Field> otherFields;
@@ -81,6 +86,9 @@ public class TableTemplate {
@EqualsAndHashCode(callSuper = true)
public static class LogicDeleteField extends Field {

/**
* 标识逻辑删除时,需要设置的值
*/
private String deleteValue;

}


+ 7
- 2
common/src/main/java/work/xuye/common/enums/DeleteCheckMode.java Dosyayı Görüntüle

@@ -5,9 +5,14 @@ package work.xuye.common.enums;
* @since 2023/3/9 16:24
**/
public enum DeleteCheckMode {
/**
* 通过数据消失来判断
*/
disappear
disappear,

/**
* 通过指定字段的值来判断
*/
fixedValue
}

+ 17
- 1
common/src/main/java/work/xuye/common/enums/ProcessMode.java Dosyayı Görüntüle

@@ -14,7 +14,23 @@ import com.baomidou.mybatisplus.annotation.EnumValue;
**/

public enum ProcessMode {
DEBUG("debug"), NORMAL("normal");

/**
* debug模式,不会使用缓存判断是否已经处理过了
*/
DEBUG("debug"),


/**
* 常规模式,调试完毕线上使用
*/
NORMAL("normal"),


/**
* 监听模式,只爬取并且监听每次爬取的结果,不会进行任何处理
*/
WATCH("watch");

@EnumValue
private final String mode;


+ 5
- 0
common/src/main/java/work/xuye/common/enums/ProcessReason.java Dosyayı Görüntüle

@@ -25,6 +25,11 @@ public enum ProcessReason {
*/
resource_changed("\uD83D\uDD04", "缓存中存在该记录,但与最新状态不一致"),

/**
* 结束执行
*/
end("\uD83D\uDD1A", "结束执行"),

other("⚠️", "不应该出现的状态,如果出现了,请尽快排查");

@Getter


+ 62
- 0
common/src/main/java/work/xuye/common/spel/CustomFunction.java Dosyayı Görüntüle

@@ -1,5 +1,6 @@
package work.xuye.common.spel;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;
@@ -18,6 +19,7 @@ import work.xuye.common.utils.HttpUtil;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Map;

/**
* @author xuye
@@ -195,4 +197,64 @@ public class CustomFunction {
}


/**
* 先从map中获取指定的key,然后使用jsonpath提取json中的数据,判断是否等于期望值
*
* @param map map
* @param mapFieldKey map中的key
* @param jsonPath jsonpath
* @param value 期望值
* @return 是否相等
*/
public boolean equals(Map<String, Object> map, String mapFieldKey, String jsonPath, Object value) {
Object fieldValue = map.get(mapFieldKey);
log.debug("equals handler2, map: [{}], mapFieldKey: [{}], mapValue: [{}]", map, mapFieldKey, value);
String result = this.jsonExtract(fieldValue.toString(), jsonPath);
boolean equals = result.equals(value);
log.debug("equals handler1, json: [{}], jsonPath: [{}], value: [{}], result: [{}]", fieldValue, jsonPath, value, equals);
return equals;
}

/**
* 固定返回 true,用于希望断言通过时使用
*
* @return true
*/
public boolean returnTrue() {
return true;
}


/**
* 固定返回 false,用于希望断言不通过时使用
*
* @return false
*/
public boolean returnFalse() {
return false;
}

/**
* 原始栏目code递增映射
*
* @param json json字符串
* @param jsonPath jsonpath
* @return 原始栏目JSON结果
*/
public synchronized String mappingColumns(String json, String jsonPath) {
int start = 0;
String columns = this.jsonExtract(json, jsonPath);
String[] columnList = columns.split(",");
JsonArray array = new JsonArray();
for (String column : columnList) {
JsonObject obj = new JsonObject();
obj.addProperty("code", start++);
obj.addProperty("name", column);
array.add(obj);
}
log.info("incrColumns handler, jsonPath: [{}], columns: [{}], result: [{}]", jsonPath, columns, array);
return array.toString();
}


}

+ 1
- 2
common/src/main/java/work/xuye/common/spel/SpringExpressionLanguageEvaluator.java Dosyayı Görüntüle

@@ -22,8 +22,7 @@ import java.util.List;
@Service
@RequiredArgsConstructor
public class SpringExpressionLanguageEvaluator {


public final CustomFunction cf;

private final SpelExpressionParser parser;


+ 27
- 0
common/src/main/java/work/xuye/common/utils/DebugUtil.java Dosyayı Görüntüle

@@ -0,0 +1,27 @@
package work.xuye.common.utils;

import lombok.extern.slf4j.Slf4j;
import work.xuye.common.dto.TaskVO;

import java.util.Set;

/**
* @author xuye
* @since 2023/5/6 10:31
**/
@Slf4j
public class DebugUtil {


private static final Set<Integer> skipTaskIds = Set.of();

public static boolean isSkip(TaskVO task) {
Integer taskId = task.getTask().getId();
if (skipTaskIds.contains(taskId)) {
log.warn("skip task, [{}-{}]", taskId, task.getTask().getName());
return true;
}
return false;
}

}

+ 12
- 1
common/src/main/resources/markdown/analizer.md Dosyayı Görüntüle

@@ -2,4 +2,15 @@

<div>
<iframe src="https://redash.deepq.tech/embed/query/558/visualization/661?api_key=NoWmar4KaqaYLErTOOrzfrhYOIZl2ok0DnDBetVu&" width="1020" height="600"></iframe>
</div>
</div>

# 堆积监控

## [grafana](https://grafana-prd.deepq.tech/goto/zWtaW4s4k)

- ### [source](https://snp-source.deepq.tech/actuator/metrics/spring.cloud.stream.binder.kafka.offset)

- ### [transformer](https://snp-transformer.deepq.tech/actuator/metrics/spring.cloud.stream.binder.kafka.offset)

- ### [sink](https://snp-sink.deepq.tech/actuator/metrics/spring.cloud.stream.binder.kafka.offset)


+ 26
- 0
common/src/main/resources/markdown/deploy.md Dosyayı Görüntüle

@@ -31,6 +31,24 @@
./kafka-topics.sh --bootstrap-server kafka-0.kafka.common.svc.cluster.local:9092,kafka-1.kafka.common.svc.cluster.local:9092,kafka-2.kafka.common.svc.cluster.local:9092 --describe --topic source-out
```

调整分区

```shell
./kafka-topics.sh --bootstrap-server kafka-0.kafka.common.svc.cluster.local:9092,kafka-1.kafka.common.svc.cluster.local:9092,kafka-2.kafka.common.svc.cluster.local:9092 --alter --topic task-out --partitions 8
```

查看消费组

```shell
./kafka-consumer-groups.sh --bootstrap-server kafka-0.kafka.common.svc.cluster.local:9092,kafka-1.kafka.common.svc.cluster.local:9092,kafka-2.kafka.common.svc.cluster.local:9092 --describe --group prd
```

根据分区offset查找消息

```shell
./kafka-console-consumer.sh --bootstrap-server kafka-0.kafka.common.svc.cluster.local:9092,kafka-1.kafka.common.svc.cluster.local:9092,kafka-2.kafka.common.svc.cluster.local:9092 --topic source-out --partition 2 --offset 2314 --max-messages 1
```

## 重启程序

测试环境
@@ -43,4 +61,12 @@ kubectl rollout restart -n std-news-process-sit deployment std-news-process-sit

```shell
kubectl rollout restart -n std-news-process-prd deployment std-news-process-prd-std-news-process-scheduler std-news-process-prd-std-news-process-sink std-news-process-prd-std-news-process-source std-news-process-prd-std-news-process-transformer
```

## 查看日志

查看所有pod的日志

```shell
kubectl logs -n std-news-process-prd -l line=std-news-process --max-log-requests 10 -f --prefix
```

+ 0
- 4
common/src/main/resources/markdown/readme.md Dosyayı Görüntüle

@@ -25,7 +25,3 @@
### 异常处理

程序中发生的所有异常,都会通过钉钉通知

### 优化方案

1. 如果以后配置接入任务多,由于媒体接口响应速度慢,source组件会有消息堆积,需要考虑异步处理

+ 34
- 16
common/src/main/resources/markdown/sql.md Dosyayı Görüntüle

@@ -1,6 +1,6 @@
# 常用SQL

## 查询当前任务状态
## 查询已注册任务状态

```sql
select name,
@@ -28,25 +28,43 @@ from task;

```sql
select date(createDate) as 日期, spider, count(*) as count
from all_news
where spider in (
"zgzqb_rss",
"zzkx_rss",
"zzzncx_rss",
"zzjnz_rss"
)
from pyspider_resultdb.all_news
where JSON_EXTRACT(raw_data, '$.crawler') = 'snp'
and createDate > curdate()
group by spider, 日期
order by 日期 desc, count;
order by count desc;
```

## 查开发环境入库数量
## 查询图文源今日入库数据

```sql
select date(createDate) as 日期, spider, count(*) as count
from std_news
where spider in (select distinct name
from task)
group by spider, 日期
order by 日期 desc, count;
select title,
url,
content,
webSource,
spider,
postDate,
createDate,
update_time
from all_news
where JSON_EXTRACT(raw_data, '$.crawler') = 'snp'
and createDate > curdate()
order by createDate desc;
```

## 查询某个视频源今日入库数据

```sql
select v.title,
t.`status`,
v.url,
v.`video_url`,
v.`created_time`,
t.`retry_count`
from `all_news_video_dycj` as v
left join `task_news` as t on v.`url` = t.`url`
left join `task_news_content` as c on t.`id` = c.id
where JSON_EXTRACT(v.raw_data, '$.crawler') = 'snp'
order by v.`created_time` desc
limit 50
```

+ 4
- 0
common/src/main/resources/markdown/task.md Dosyayı Görüntüle

@@ -34,3 +34,7 @@
1. 和图文RSS的123情况一致
2. 存在直播链接无法正常处理的情况,已经配置域名黑名单对其进行忽略

## 基金宝图文、视频RSS

1. 对方给的接口数据格式比较乱,让对方改了多次才勉强能用
2. 视频暂时只有一条数据,找媒体方沟通多次依旧只能提供一条

+ 2
- 2
pom.xml Dosyayı Görüntüle

@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
@@ -55,7 +56,6 @@
<scope>import</scope>
</dependency>
</dependencies>

</dependencyManagement>

<build>


+ 5
- 1
sink/src/main/java/work/xuye/sink/handler/DisappearHandler.java Dosyayı Görüntüle

@@ -18,6 +18,7 @@ import work.xuye.common.enums.MessageType;
import work.xuye.common.enums.Status;
import work.xuye.common.properties.AlertProperties;
import work.xuye.common.service.MessageService;
import work.xuye.common.utils.DebugUtil;
import work.xuye.sink.service.JdbcService;
import work.xuye.sink.service.SqlExecutor;

@@ -48,6 +49,9 @@ public class DisappearHandler {

Integer taskId = (Integer) message.getHeaders().get(MessageConstants.TASK_ID);
TaskVO taskVO = taskManager.getTaskInfoByTaskId(taskId);
if (DebugUtil.isSkip(taskVO)) {
return;
}
Task task = taskVO.getTask();
String taskName = task.getName();
SinkParams.CheckDelete delete = task.getSinkParams().getCheckDelete();
@@ -108,13 +112,13 @@ public class DisappearHandler {

String url = alertProperties.getDomainName().getSink() + "/jdbc/execute/" + sqlEntity.getId() + "/" + sqlEntity.getCaptcha();
// 如果已经自动删除了,就发送自动删除结果
subTitle = "待手动删除条数: " + removed.size();
if (autoDeleted) {
subTitle = "已自动删除条数: " + removed.size();
fullMessage.append("> ✅[查看删除结果](").append(url).append(")");
}
// 如果未自动删除,并且还配置开启了手动删除的入口,就拼接手动删除的入口
else if (disappearConfig.isSendExecuteSQLUrl()) {
subTitle = "待删除条数: " + removed.size();
fullMessage.append("> ⚠️[点击确认删除](").append(url).append(")");
}



+ 114
- 23
sink/src/main/java/work/xuye/sink/handler/ItemHandler.java Dosyayı Görüntüle

@@ -15,13 +15,11 @@ import work.xuye.common.db.entity.vo.SinkParams;
import work.xuye.common.db.entity.vo.TableTemplate;
import work.xuye.common.db.service.TaskManager;
import work.xuye.common.dto.TaskVO;
import work.xuye.common.enums.ProcessMode;
import work.xuye.common.enums.ProcessReason;
import work.xuye.common.enums.ResourceStatus;
import work.xuye.common.enums.Status;
import work.xuye.common.enums.*;
import work.xuye.common.service.UrlMD5Service;
import work.xuye.common.spel.CustomFunction;
import work.xuye.common.spel.SpringExpressionLanguageEvaluator;
import work.xuye.common.utils.DebugUtil;
import work.xuye.sink.service.JdbcService;

import java.util.ArrayList;
@@ -76,9 +74,13 @@ public class ItemHandler {
return;
}
TaskVO taskVO = taskManager.getTaskInfoByTaskId((Integer) message.getHeaders().get(MessageConstants.TASK_ID));
if (DebugUtil.isSkip(taskVO)) {
return;
}
//相关字段放这里
String md5Digest = DigestUtils.md5DigestAsHex(message.getPayload().getBytes());
TableTemplate template = taskVO.getTemplate().getTableTemplate();
SinkParams.CheckDelete checkDeleteConfig = taskVO.getTask().getSinkParams().getCheckDelete();
// 根据消息体,和映射关系,使用SpEL表达式,得到计算结果
HashMap<String, Object> expressionResultMap = evaluator.evaluate(taskVO.getMapping().getTableMapping(), message);
// 如果不能通过校验,则抛出异常
@@ -105,18 +107,105 @@ public class ItemHandler {
}
//判断数据是否存在
boolean itemExist = this.itemExist(taskVO.getTask().getSinkParams(), template);
//如果已经存在了,就判断是否需要更新,如果需要更新就更新
//如果已经存在了,就判断是否需要删除或更新
if (itemExist) {
this.tryUpdate(taskVO, expressionResultMap, md5Digest, template, reason);
// 先去尝试删除
boolean deleted = this.tryDelete(taskVO, template, expressionResultMap, checkDeleteConfig, reason);
// 如果将已存在的数据删除了,就可以直接放入缓存然后结束了
if (deleted) {
this.putToCache(template, md5Digest);
return;
}
Map<String, Object> dbItem = this.getExistedItem(taskVO, template);
this.tryUpdate(taskVO, dbItem, expressionResultMap, md5Digest, template, reason);
//如果不存在,就插入
} else {
this.tryInsert(taskVO.getTask(), template, reason);
this.tryInsert(taskVO.getTask(), expressionResultMap, template, reason);
}
this.putToCache(template, md5Digest);
}

/**
* 是否满足一组断言规则
*
* @param task 任务
* @param map map结构的数据
* @param template 模板
* @param predicates 断言规则
* @return 是否满足
*/
private boolean isSatisfied(Task task, Map<String, Object> map, TableTemplate template, List<String> predicates, String description) {
if (ObjectUtils.isEmpty(predicates)) {
return false;
}
boolean isValid = true;
TableTemplate.Field uniqueField = template.getUniqueField();
String taskName = task.getName();
for (String expr : predicates) {
boolean valid = (Boolean) evaluator.evaluate(map, expr, Boolean.class);
if (!valid) {
log.info("[{}][ℹ️️ {}- 断言未通过 ]" + "[{}] {}: {}",
taskName,
description,
expr,
uniqueField.getFieldName(),
uniqueField.getValue());
isValid = false;
break;
}
}
return isValid;
}

private void putToCache(TableTemplate template, String md5Digest) {
// 必须在方法出栈前,将md5放入缓存,因为Spring Cloud Stream默认会重试三次,如果提前放入缓存了,会导致首次重试被判定为无需处理
urlMD5Service.put(template.getUniqueField().getValue().toString(), md5Digest);
}


private boolean tryDelete(TaskVO taskVO, TableTemplate template, HashMap<String, Object> newItem, SinkParams.CheckDelete checkDeleteConfig, ProcessReason reason) {
// 如果不是 是固定值检测删除并且状态是开启,说明无需处理,直接返回
if (!(checkDeleteConfig.getMode().equals(DeleteCheckMode.fixedValue) && checkDeleteConfig.getStatus().equals(Status.on))) {
return false;
}
SinkParams.FixedValueConfig fixedValueConfig = checkDeleteConfig.getFixedValueConfig();
List<String> predicates = fixedValueConfig.getPredicates();
// 满足断言条件,才能继续处理
boolean satisfied = this.isSatisfied(taskVO.getTask(), newItem, template, predicates, "删除");
if (satisfied) {
return doDelete(taskVO, template.getUniqueField(), reason);
}
return false;
}

private boolean doDelete(TaskVO taskVO, TableTemplate.Field uniqueField, ProcessReason reason) {
String tableName = taskVO.getTask().getSinkParams().getTableName();
Object value = uniqueField.getValue();
TableTemplate template = taskVO.getTemplate().getTableTemplate();
TableTemplate.LogicDeleteField logicDeleteField = template.getLogicDeleteField();
boolean isLogicDelete = !ObjectUtils.isEmpty(logicDeleteField);
String deleteSql;
if (!isLogicDelete) {
deleteSql = SqlGenerator.generateDeleteSql(tableName, uniqueField.getFieldName(), uniqueField.getValue());
} else {
deleteSql = SqlGenerator.generateLogicDeleteUpdateSql(tableName, template);
}
int rows = jdbcService.executeSql(taskVO.getTask().getSinkParams().getDataSourceName(), deleteSql);
log.info("[{}][\uD83D\uDDD1️已{}删除{}条][{} {}] " + "{}: {}",
taskVO.getTask().getName(),
isLogicDelete ? "逻辑" : "",
rows,
reason.getEmoji(),
reason.name(),
uniqueField.getFieldName(),
value);
if (rows != 1) {
throw new RuntimeException("期望删除一条,实际删除" + rows + "条数据,请尽快处理");
}
return true;
}


private boolean isTemplateValid(String taskName, TableTemplate template) {
boolean isValid = true;
List<TableTemplate.Field> allField = new ArrayList<>();
@@ -132,7 +221,7 @@ public class ItemHandler {
for (String rule : rules) {
boolean valid = (Boolean) evaluator.evaluate(field.getValue(), rule, Boolean.class);
if (!valid) {
log.warn("[{}][❌️ 规则校验未通过]" + "[{}] {}: {}",
log.warn("[{}][❌️ 模板规则校验未通过]" + "[{}] {}: {}",
taskName,
rule,
field.getFieldName(),
@@ -201,23 +290,25 @@ public class ItemHandler {
return jdbcService.isExist(sinkParams.getDataSourceName(), selectExist);
}

private Map<String, Object> getExistedItem(TaskVO taskVO, TableTemplate template) {
String dataSourceName = taskVO.getTask().getSinkParams().getDataSourceName();
return jdbcService.getOne(dataSourceName, SqlGenerator.generateFetchSql(template));
}

private void tryUpdate(TaskVO taskVO, HashMap<String, Object> resultMap, String md5Digest, TableTemplate tableTemplate, ProcessReason reason) {

private void tryUpdate(TaskVO taskVO, Map<String, Object> dbItem, HashMap<String, Object> resultMap, String md5Digest, TableTemplate tableTemplate, ProcessReason reason) {
SinkParams sinkParams = taskVO.getTask().getSinkParams();
String taskName = taskVO.getTask().getName();
boolean updateConfigEnabled = sinkParams.getCheckUpdate().getStatus().equals(Status.on);
if (updateConfigEnabled) {
String dataSourceName = sinkParams.getDataSourceName();
Assert.notNull(dataSourceName, "dataSourceName is null");

Map<String, Object> item = jdbcService.getOne(dataSourceName, SqlGenerator.generateFetchSql(tableTemplate));
SinkParams.CheckUpdate checkUpdateConfig = sinkParams.getCheckUpdate();
String fieldName = checkUpdateConfig.getFieldName();
Object dbValue = item.get(fieldName);
Object dbValue = dbItem.get(fieldName);
if (ObjectUtils.isEmpty(dbValue)) {
throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是数据库不存在该字段");
}

Object nowValue = resultMap.get(fieldName);
if (!ObjectUtils.isEmpty(checkUpdateConfig.getJsonPath())) {
dbValue = cf.jsonExtract(dbValue.toString(), checkUpdateConfig.getJsonPath());
@@ -245,7 +336,6 @@ public class ItemHandler {
tableTemplate.getUniqueField().getValue());
}
} else {

log.info("[{}][⏭️ 不检查更新][{} {}] " + "{}: {}",
taskName,
reason.getEmoji(),
@@ -256,15 +346,16 @@ public class ItemHandler {
}


private void tryInsert(Task task, TableTemplate template, ProcessReason reason) {
jdbcService.update(task.getSinkParams().getDataSourceName(), SqlGenerator.generateInsertSql(template));
log.info("[{}][\uD83D\uDCBE 已保存][{} {}] " + "{}: {}",
task.getName(),
reason.getEmoji(),
reason.name(),
template.getUniqueField().getFieldName(),
template.getUniqueField().getValue());
private void tryInsert(Task task, Map<String, Object> newItem, TableTemplate template, ProcessReason reason) {
if (this.isSatisfied(task, newItem, template, task.getSinkParams().getInsertConfig().getPredicates(), "保存")) {
jdbcService.update(task.getSinkParams().getDataSourceName(), SqlGenerator.generateInsertSql(template));
log.info("[{}][\uD83D\uDCBE 已保存][{} {}] " + "{}: {}",
task.getName(),
reason.getEmoji(),
reason.name(),
template.getUniqueField().getFieldName(),
template.getUniqueField().getValue());
}
}


}

+ 34
- 4
sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java Dosyayı Görüntüle

@@ -51,7 +51,7 @@ public class SqlGenerator {
sqlBuilder.append(", `").append(updateTimeField.getFieldName()).append("`=")
.append(generateCurrentTimeFunctionString(updateTimeField.getFieldType()));
}
sqlBuilder.append(" WHERE ")
.append('`').append(template.getUniqueField().getFieldName()).append('`')
.append('=').append(formatFieldValue(template.getUniqueField()));
@@ -195,14 +195,14 @@ public class SqlGenerator {
return "SELECT `" + selectField + "` FROM `" + tableName + "` WHERE " +
condition + '\'' + lastValue + '\'';
}
public static String generateGivenFieldSql(String tableName, String condition, String lastValue) {
return "SELECT * FROM `" + tableName + "` WHERE " +
condition + '\'' + lastValue + '\'';
}

public static String generateDeleteSql(String tableName, String selectField, String value) {
return "DELETE FROM " + tableName + " WHERE " + selectField + "='" + value + "'" + " LIMIT 1;";
public static String generateDeleteSql(String tableName, String field, Object value) {
return "DELETE FROM " + tableName + " WHERE " + field + "='" + value + "'" + " LIMIT 1;";
}

public static List<String> generateDeleteSqls(String tableName, String fieldName, List<String> dbs) {
@@ -212,4 +212,34 @@ public class SqlGenerator {
}
return result;
}


public static String generateLogicDeleteUpdateSql(String tableName, TableTemplate template) {

TableTemplate.Field updateTimeField = template.getUpdateTimeField();
TableTemplate.LogicDeleteField logicDeleteField = template.getLogicDeleteField();
String logicDeleteFieldName = logicDeleteField.getFieldName();
String logicDeleteValue = logicDeleteField.getDeleteValue();
TableTemplate.Field uniqueField = template.getUniqueField();
String uniqueFieldFieldName = uniqueField.getFieldName();
Object uniqueFieldValue = uniqueField.getValue();
TableTemplate.Field updateField = template.getUpdateField();

StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder.append("UPDATE `").append(tableName).append("` SET ");
sqlBuilder.append('`').append(logicDeleteFieldName).append('`')
.append('=').append(logicDeleteValue);
sqlBuilder.append(", `").append(updateField.getFieldName()).append("`=")
.append(updateField.getValue());
if (updateTimeField != null) {
sqlBuilder.append(", `").append(updateTimeField.getFieldName()).append("`=")
.append(generateCurrentTimeFunctionString(updateTimeField.getFieldType()));
}

sqlBuilder.append(" WHERE ")
.append('`').append(uniqueFieldFieldName).append('`')
.append('=').append('\'').append(uniqueFieldValue.toString())
.append("' LIMIT 1");
return sqlBuilder.toString();
}
}

+ 17
- 2
sink/src/main/java/work/xuye/sink/service/JdbcService.java Dosyayı Görüntüle

@@ -69,9 +69,24 @@ public class JdbcService {
}


/**
* 查询一条数据,如果查询到了多条,抛出异常;如果没有查询到,返回null
*
* @param datasourceName 数据源名称
* @param sql sql
* @return 查询结果,如果没有查询到,返回null
*/
public Map<String, Object> getOne(String datasourceName, String sql) {
JdbcTemplate jdbcTemplate = this.getJdbcTemplate(datasourceName);
return jdbcTemplate.queryForMap(sql);
List<Map<String, Object>> maps = jdbcTemplate.queryForList(sql);
if (ObjectUtils.isEmpty(maps)) {
return null;
}
if (maps.size() > 1) {
log.error("期望查询出一条,实际查询出结果为{}条, sql:{}, result:{}", maps.size(), sql, maps);
throw new RuntimeException("期望查询出一条,实际查询出结果为" + maps.size() + "条");
}
return maps.get(0);
}


@@ -106,7 +121,7 @@ public class JdbcService {
public int executeSql(String datasource, String sql) {
JdbcTemplate jdbcTemplate = this.getJdbcTemplate(datasource);
int update = jdbcTemplate.update(sql);
log.info("execute sql, datasource:{}, sql:{}, result:{}", datasource, sql, update);
log.info("execute sql, datasource:[{}], sql:[{}], result:[{}]", datasource, sql, update);
return update;

}


+ 18
- 4
source/src/main/java/work/xuye/source/handler/SourceHandler.java Dosyayı Görüntüle

@@ -20,6 +20,7 @@ import work.xuye.common.dto.TaskVO;
import work.xuye.common.enums.ProcessMode;
import work.xuye.common.enums.ResourceStatus;
import work.xuye.common.service.UrlMD5Service;
import work.xuye.common.utils.DebugUtil;
import work.xuye.common.utils.ID;
import work.xuye.source.request.RequestClient;

@@ -44,6 +45,9 @@ public class SourceHandler {

public Message<HttpRes> handle(Message<HttpRequestParams> message) {
TaskVO taskVO = taskManager.getTaskInfoByTaskId((Integer) message.getHeaders().get(MessageConstants.TASK_ID));
if (DebugUtil.isSkip(taskVO)) {
return null;
}
HttpRes res = null;
HttpRequestParams requestParams = message.getPayload();
StopWatch stopWatch = new StopWatch();
@@ -56,16 +60,20 @@ public class SourceHandler {
urlMD5Service.put(requestParams.getUrl(), md5Digest);
res.setResourceStatus(resourceStatus);
} catch (Exception e) {
log.error("request error,request params:{}, error:{}", requestParams, e);
log.error("[{}] request error, request params:[{}], error message:[{}]", taskVO.getTask().getName(), requestParams, e.getMessage());
return null;
}
if (!res.getStatus().is2xxSuccessful()) {
log.warn("[{}] response status code is not 2xx, request params: [{}], response: [{}]", taskVO.getTask().getName(), requestParams, res);
throw new RuntimeException("[" + taskVO.getTask().getName() + "]" + "unexpected status code: " + res.getStatus());
}
ProcessMode processMode = taskManager
.getTaskInfoByTaskId(
(Integer) message.getHeaders().get(MessageConstants.TASK_ID))
.getTask()
.getProcessMode();
this.log(message, res, processMode, stopWatch.getLastTaskTimeMillis(), taskVO.getTask());
if (res.getResourceStatus().equals(ResourceStatus.UNCHANGED) && ProcessMode.NORMAL.equals(processMode)) {
if (res.getResourceStatus().equals(ResourceStatus.UNCHANGED) && !ProcessMode.DEBUG.equals(processMode)) {
return null;
}
return MessageBuilder
@@ -81,8 +89,14 @@ public class SourceHandler {

private void log(Message<HttpRequestParams> message, HttpRes res, ProcessMode processMode, long ms, Task task) {
HttpRequestParams req = message.getPayload();

String emoji = processMode.equals(ProcessMode.NORMAL) ? "" : "🤖";
String emoji = "";
if (processMode.equals(ProcessMode.NORMAL)) {
emoji = "";
} else if (processMode.equals(ProcessMode.DEBUG)) {
emoji = "🤖";
} else if (processMode.equals(ProcessMode.WATCH)) {
emoji = "👀";
}
if (res.getStatus().is2xxSuccessful()) {
log.info("[{}-{}] [{}{} {}] [{} {} {}ms] {} ", task.getId(), task.getName(),
res.getResourceStatus().getEmoji(), emoji,


+ 23
- 19
transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java Dosyayı Görüntüle

@@ -12,13 +12,16 @@ import org.springframework.util.ObjectUtils;
import work.xuye.common.constant.BindingConstants;
import work.xuye.common.constant.MessageConstants;
import work.xuye.common.constant.StageConstants;
import work.xuye.common.db.entity.Task;
import work.xuye.common.db.entity.vo.ParseParams;
import work.xuye.common.db.entity.vo.SinkParams;
import work.xuye.common.db.entity.vo.TransformParams;
import work.xuye.common.db.service.TaskManager;
import work.xuye.common.dto.HttpRes;
import work.xuye.common.dto.TaskVO;
import work.xuye.common.enums.*;
import work.xuye.common.service.UrlMD5Service;
import work.xuye.common.utils.DebugUtil;
import work.xuye.common.utils.ID;
import work.xuye.common.utils.JsonPathUtil;
import work.xuye.transformer.transformer.MessageTransformer;
@@ -45,7 +48,6 @@ public class TransformHandler {


public List<Message<String>> handle(Message<HttpRes> message) {

try {
return this.doHandle(message);
} catch (Exception e) {
@@ -58,10 +60,12 @@ public class TransformHandler {
public List<Message<String>> doHandle(Message<HttpRes> message) {
// 获取任务信息
TaskVO taskVO = taskManager.getTaskInfoByTaskId((Integer) message.getHeaders().get(MessageConstants.TASK_ID));

if (DebugUtil.isSkip(taskVO)) {
return null;
}
ProcessReason processReason = processReason(message, taskVO);
// 如果资源状态是未变化,直接结束执行
if (ProcessReason.unchanged.equals(processReason)) {
if (ProcessReason.unchanged.equals(processReason) || ProcessReason.end.equals(processReason)) {
// todo 考虑是否有必要加日志
return null;
}
@@ -79,6 +83,10 @@ public class TransformHandler {
private ProcessReason processReason(Message<HttpRes> message, TaskVO taskVO) {
HttpRes httpRes = message.getPayload();
ProcessMode processMode = taskVO.getTask().getProcessMode();
if (ProcessMode.WATCH.equals(processMode)) {
log.debug("@@ 此任务为监听模式,不会进行任何处理");
return ProcessReason.end;
}

// 理论上不会出现这种情况,因为符合这个条件的消息source是不会下发的,如果出现了,说明是消息被人为重发了
if (ResourceStatus.UNCHANGED.equals(httpRes.getResourceStatus()) && ProcessMode.NORMAL.equals(processMode)) {
@@ -102,7 +110,16 @@ public class TransformHandler {
private void transformResult(Message<HttpRes> message, TaskVO taskVO) {

String result = message.getPayload().getBody();
List<String> handlers = taskVO.getTask().getTransformParams().getHandlers();
Task task = taskVO.getTask();
TransformParams transformParams = task.getTransformParams();
if (ObjectUtils.isEmpty(transformParams)) {
if (task.getProcessMode().getMode().equals(ProcessMode.WATCH.getMode())) {
return;
} else {
throw new RuntimeException(task.getName() + " transformParams is null");
}
}
List<String> handlers = transformParams.getHandlers();
Assert.notNull(handlers, "handlers is null");
String seedUrl = this.getSeedUrl(message);

@@ -172,23 +189,10 @@ public class TransformHandler {
results.add(itemMessage);
});
if (results.size() > 0) {
log.info("@ [{}{}] 下发转换后的消息,数量:{}", processReason.getEmoji(), processReason, results.size());
log.info("@ [{}][{}{}] 下发转换后的消息,数量:{}", taskVO.getTask().getName(), processReason.getEmoji(), processReason, results.size());
}

// 如果没有items的jsonpath,说明只做监控,后续不做入库处理
} else {
Message<String> itemMessage = MessageBuilder.withPayload(result)
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.END)
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID))
.setHeader(MessageConstants.SOURCE_TRACE_ID, message.getHeaders().get(MessageConstants.SOURCE_TRACE_ID))
.setHeader(MessageConstants.TRANSFORMER_TRACE_ID, transformerTraceId)
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID))
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME))
.setHeader(MessageConstants.SEED_URL, message.getHeaders().get(MessageConstants.SEED_URL))
.build();
results.add(itemMessage);
log.info("@ 该消息未进行转换,直接下发,后续无需处理," + MessageConstants.TRANSFORMER_TRACE_ID + ": [{}]",
transformerTraceId);
log.warn("@@ 任务[{}]没有配置itemsPath", taskVO.getTask().getName());
}
return results;
}


+ 9
- 3
transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java Dosyayı Görüntüle

@@ -14,14 +14,20 @@ import org.springframework.stereotype.Component;
@Component("resData")
@RequiredArgsConstructor
public class ResDataTransformer implements MessageTransformer {

private int threshold = 300;

@Override
public String transform(String json, String seedUrl) {
JsonObject res = JsonParser.parseString(json).getAsJsonObject();
boolean hasData = res.has("data");
if (!hasData) {
throw new RuntimeException("res no data");
if (hasData) {
threshold = 300;
} else {
threshold--;
}
if (threshold <= 0) {
throw new RuntimeException("res no data, seedUrl: " + seedUrl);
}
return res.get("data").getAsString();
}


Yükleniyor…
İptal
Kaydet