上证基金宝接入 See merge request fhl/std-news-process!3master
@@ -0,0 +1,5 @@ | |||
## [接口文档](https://snp-scheduler.deepq.tech/) | |||
## [任务描述](https://snp-scheduler.deepq.tech/doc.html#/default-mdtagaf78a-omd/document-dc235ffa1acda5f62eb586372d3bb7e5) | |||
## [当日爬取可视化(生产环境)](https://snp-scheduler.deepq.tech/doc.html#/default-mdtagaf78a-omd/document-d7656a65244e1fa48671f8f71422550e) |
@@ -1,5 +1,6 @@ | |||
<?xml version="1.0" encoding="UTF-8"?> | |||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" | |||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |||
<modelVersion>4.0.0</modelVersion> | |||
<parent> | |||
<groupId>work.xuye</groupId> | |||
@@ -109,6 +110,12 @@ | |||
<groupId>io.micrometer</groupId> | |||
<artifactId>micrometer-registry-prometheus</artifactId> | |||
</dependency> | |||
<dependency> | |||
<groupId>org.springframework.boot</groupId> | |||
<artifactId>spring-boot-devtools</artifactId> | |||
<scope>runtime</scope> | |||
<optional>true</optional> | |||
</dependency> | |||
</dependencies> | |||
@@ -44,8 +44,7 @@ public class Task implements Serializable { | |||
private String description; | |||
@TableField("process_mode") | |||
private ProcessMode processMode; | |||
private ProcessMode processMode = ProcessMode.NORMAL; | |||
@TableField(value = "request_params", typeHandler = JacksonTypeHandler.class) | |||
private HttpRequestParams requestParams; | |||
@@ -6,6 +6,8 @@ import lombok.NoArgsConstructor; | |||
import work.xuye.common.enums.DeleteCheckMode; | |||
import work.xuye.common.enums.Status; | |||
import java.util.List; | |||
/** | |||
* @author xuye | |||
* @since 2023/2/17 23:41 | |||
@@ -37,10 +39,16 @@ public class SinkParams { | |||
*/ | |||
private String tableName; | |||
private InsertConfig insertConfig; | |||
@Data | |||
public static class InsertConfig { | |||
private List<String> predicates; | |||
} | |||
@Data | |||
public static class CheckUpdate { | |||
private Status status; | |||
private String fieldName; | |||
private String jsonPath; | |||
} | |||
@@ -50,8 +58,20 @@ public class SinkParams { | |||
private Status status; | |||
private DeleteCheckMode mode; | |||
private DisappearConfig disappearConfig; | |||
private FixedValueConfig fixedValueConfig; | |||
} | |||
@Data | |||
public static class FixedValueConfig { | |||
/** | |||
* 满足断言条件的数据才会被处理 | |||
*/ | |||
private List<String> predicates; | |||
} | |||
@Data | |||
public static class DisappearConfig { | |||
/** | |||
@@ -42,6 +42,11 @@ public class TableTemplate { | |||
private Field updateTimeField; | |||
/** | |||
* 如果发生 更新/删除 操作,需要更新该字段的值 | |||
*/ | |||
private Field updateField; | |||
/** | |||
* 其他字段,指的是普通字段 | |||
*/ | |||
private List<Field> otherFields; | |||
@@ -81,6 +86,9 @@ public class TableTemplate { | |||
@EqualsAndHashCode(callSuper = true) | |||
public static class LogicDeleteField extends Field { | |||
/** | |||
* 标识逻辑删除时,需要设置的值 | |||
*/ | |||
private String deleteValue; | |||
} | |||
@@ -5,9 +5,14 @@ package work.xuye.common.enums; | |||
* @since 2023/3/9 16:24 | |||
**/ | |||
public enum DeleteCheckMode { | |||
/** | |||
* 通过数据消失来判断 | |||
*/ | |||
disappear | |||
disappear, | |||
/** | |||
* 通过指定字段的值来判断 | |||
*/ | |||
fixedValue | |||
} |
@@ -14,7 +14,23 @@ import com.baomidou.mybatisplus.annotation.EnumValue; | |||
**/ | |||
public enum ProcessMode { | |||
DEBUG("debug"), NORMAL("normal"); | |||
/** | |||
* debug模式,不会使用缓存判断是否已经处理过了 | |||
*/ | |||
DEBUG("debug"), | |||
/** | |||
* 常规模式,调试完毕线上使用 | |||
*/ | |||
NORMAL("normal"), | |||
/** | |||
* 监听模式,只爬取并且监听每次爬取的结果,不会进行任何处理 | |||
*/ | |||
WATCH("watch"); | |||
@EnumValue | |||
private final String mode; | |||
@@ -25,6 +25,11 @@ public enum ProcessReason { | |||
*/ | |||
resource_changed("\uD83D\uDD04", "缓存中存在该记录,但与最新状态不一致"), | |||
/** | |||
* 结束执行 | |||
*/ | |||
end("\uD83D\uDD1A", "结束执行"), | |||
other("⚠️", "不应该出现的状态,如果出现了,请尽快排查"); | |||
@Getter | |||
@@ -1,5 +1,6 @@ | |||
package work.xuye.common.spel; | |||
import com.google.gson.JsonArray; | |||
import com.google.gson.JsonObject; | |||
import com.google.gson.JsonParser; | |||
import com.google.gson.JsonPrimitive; | |||
@@ -18,6 +19,7 @@ import work.xuye.common.utils.HttpUtil; | |||
import java.time.ZonedDateTime; | |||
import java.time.format.DateTimeFormatter; | |||
import java.util.Arrays; | |||
import java.util.Map; | |||
/** | |||
* @author xuye | |||
@@ -195,4 +197,64 @@ public class CustomFunction { | |||
} | |||
/** | |||
* 先从map中获取指定的key,然后使用jsonpath提取json中的数据,判断是否等于期望值 | |||
* | |||
* @param map map | |||
* @param mapFieldKey map中的key | |||
* @param jsonPath jsonpath | |||
* @param value 期望值 | |||
* @return 是否相等 | |||
*/ | |||
public boolean equals(Map<String, Object> map, String mapFieldKey, String jsonPath, Object value) { | |||
Object fieldValue = map.get(mapFieldKey); | |||
log.debug("equals handler2, map: [{}], mapFieldKey: [{}], mapValue: [{}]", map, mapFieldKey, value); | |||
String result = this.jsonExtract(fieldValue.toString(), jsonPath); | |||
boolean equals = result.equals(value); | |||
log.debug("equals handler1, json: [{}], jsonPath: [{}], value: [{}], result: [{}]", fieldValue, jsonPath, value, equals); | |||
return equals; | |||
} | |||
/** | |||
* 固定返回 true,用于希望断言通过时使用 | |||
* | |||
* @return true | |||
*/ | |||
public boolean returnTrue() { | |||
return true; | |||
} | |||
/** | |||
* 固定返回 false,用于希望断言不通过时使用 | |||
* | |||
* @return false | |||
*/ | |||
public boolean returnFalse() { | |||
return false; | |||
} | |||
/** | |||
* 原始栏目code递增映射 | |||
* | |||
* @param json json字符串 | |||
* @param jsonPath jsonpath | |||
* @return 原始栏目JSON结果 | |||
*/ | |||
public synchronized String mappingColumns(String json, String jsonPath) { | |||
int start = 0; | |||
String columns = this.jsonExtract(json, jsonPath); | |||
String[] columnList = columns.split(","); | |||
JsonArray array = new JsonArray(); | |||
for (String column : columnList) { | |||
JsonObject obj = new JsonObject(); | |||
obj.addProperty("code", start++); | |||
obj.addProperty("name", column); | |||
array.add(obj); | |||
} | |||
log.info("incrColumns handler, jsonPath: [{}], columns: [{}], result: [{}]", jsonPath, columns, array); | |||
return array.toString(); | |||
} | |||
} |
@@ -22,8 +22,7 @@ import java.util.List; | |||
@Service | |||
@RequiredArgsConstructor | |||
public class SpringExpressionLanguageEvaluator { | |||
public final CustomFunction cf; | |||
private final SpelExpressionParser parser; | |||
@@ -0,0 +1,27 @@ | |||
package work.xuye.common.utils; | |||
import lombok.extern.slf4j.Slf4j; | |||
import work.xuye.common.dto.TaskVO; | |||
import java.util.Set; | |||
/** | |||
* @author xuye | |||
* @since 2023/5/6 10:31 | |||
**/ | |||
@Slf4j | |||
public class DebugUtil { | |||
private static final Set<Integer> skipTaskIds = Set.of(); | |||
public static boolean isSkip(TaskVO task) { | |||
Integer taskId = task.getTask().getId(); | |||
if (skipTaskIds.contains(taskId)) { | |||
log.warn("skip task, [{}-{}]", taskId, task.getTask().getName()); | |||
return true; | |||
} | |||
return false; | |||
} | |||
} |
@@ -2,4 +2,15 @@ | |||
<div> | |||
<iframe src="https://redash.deepq.tech/embed/query/558/visualization/661?api_key=NoWmar4KaqaYLErTOOrzfrhYOIZl2ok0DnDBetVu&" width="1020" height="600"></iframe> | |||
</div> | |||
</div> | |||
# 堆积监控 | |||
## [grafana](https://grafana-prd.deepq.tech/goto/zWtaW4s4k) | |||
- ### [source](https://snp-source.deepq.tech/actuator/metrics/spring.cloud.stream.binder.kafka.offset) | |||
- ### [transformer](https://snp-transformer.deepq.tech/actuator/metrics/spring.cloud.stream.binder.kafka.offset) | |||
- ### [sink](https://snp-sink.deepq.tech/actuator/metrics/spring.cloud.stream.binder.kafka.offset) | |||
@@ -31,6 +31,24 @@ | |||
./kafka-topics.sh --bootstrap-server kafka-0.kafka.common.svc.cluster.local:9092,kafka-1.kafka.common.svc.cluster.local:9092,kafka-2.kafka.common.svc.cluster.local:9092 --describe --topic source-out | |||
``` | |||
调整分区 | |||
```shell | |||
./kafka-topics.sh --bootstrap-server kafka-0.kafka.common.svc.cluster.local:9092,kafka-1.kafka.common.svc.cluster.local:9092,kafka-2.kafka.common.svc.cluster.local:9092 --alter --topic task-out --partitions 8 | |||
``` | |||
查看消费组 | |||
```shell | |||
./kafka-consumer-groups.sh --bootstrap-server kafka-0.kafka.common.svc.cluster.local:9092,kafka-1.kafka.common.svc.cluster.local:9092,kafka-2.kafka.common.svc.cluster.local:9092 --describe --group prd | |||
``` | |||
根据分区offset查找消息 | |||
```shell | |||
./kafka-console-consumer.sh --bootstrap-server kafka-0.kafka.common.svc.cluster.local:9092,kafka-1.kafka.common.svc.cluster.local:9092,kafka-2.kafka.common.svc.cluster.local:9092 --topic source-out --partition 2 --offset 2314 --max-messages 1 | |||
``` | |||
## 重启程序 | |||
测试环境 | |||
@@ -43,4 +61,12 @@ kubectl rollout restart -n std-news-process-sit deployment std-news-process-sit | |||
```shell | |||
kubectl rollout restart -n std-news-process-prd deployment std-news-process-prd-std-news-process-scheduler std-news-process-prd-std-news-process-sink std-news-process-prd-std-news-process-source std-news-process-prd-std-news-process-transformer | |||
``` | |||
## 查看日志 | |||
查看所有pod的日志 | |||
```shell | |||
kubectl logs -n std-news-process-prd -l line=std-news-process --max-log-requests 10 -f --prefix | |||
``` |
@@ -25,7 +25,3 @@ | |||
### 异常处理 | |||
程序中发生的所有异常,都会通过钉钉通知 | |||
### 优化方案 | |||
1. 如果以后配置接入任务多,由于媒体接口响应速度慢,source组件会有消息堆积,需要考虑异步处理 |
@@ -1,6 +1,6 @@ | |||
# 常用SQL | |||
## 查询当前任务状态 | |||
## 查询已注册任务状态 | |||
```sql | |||
select name, | |||
@@ -28,25 +28,43 @@ from task; | |||
```sql | |||
select date(createDate) as 日期, spider, count(*) as count | |||
from all_news | |||
where spider in ( | |||
"zgzqb_rss", | |||
"zzkx_rss", | |||
"zzzncx_rss", | |||
"zzjnz_rss" | |||
) | |||
from pyspider_resultdb.all_news | |||
where JSON_EXTRACT(raw_data, '$.crawler') = 'snp' | |||
and createDate > curdate() | |||
group by spider, 日期 | |||
order by 日期 desc, count; | |||
order by count desc; | |||
``` | |||
## 查开发环境入库数量 | |||
## 查询图文源今日入库数据 | |||
```sql | |||
select date(createDate) as 日期, spider, count(*) as count | |||
from std_news | |||
where spider in (select distinct name | |||
from task) | |||
group by spider, 日期 | |||
order by 日期 desc, count; | |||
select title, | |||
url, | |||
content, | |||
webSource, | |||
spider, | |||
postDate, | |||
createDate, | |||
update_time | |||
from all_news | |||
where JSON_EXTRACT(raw_data, '$.crawler') = 'snp' | |||
and createDate > curdate() | |||
order by createDate desc; | |||
``` | |||
## 查询某个视频源今日入库数据 | |||
```sql | |||
select v.title, | |||
t.`status`, | |||
v.url, | |||
v.`video_url`, | |||
v.`created_time`, | |||
t.`retry_count` | |||
from `all_news_video_dycj` as v | |||
left join `task_news` as t on v.`url` = t.`url` | |||
left join `task_news_content` as c on t.`id` = c.id | |||
where JSON_EXTRACT(v.raw_data, '$.crawler') = 'snp' | |||
order by v.`created_time` desc | |||
limit 50 | |||
``` |
@@ -34,3 +34,7 @@ | |||
1. 和图文RSS的123情况一致 | |||
2. 存在直播链接无法正常处理的情况,已经配置域名黑名单对其进行忽略 | |||
## 基金宝图文、视频RSS | |||
1. 对方给的接口数据格式比较乱,让对方改了多次才勉强能用 | |||
2. 视频暂时只有一条数据,找媒体方沟通多次依旧只能提供一条 |
@@ -1,5 +1,6 @@ | |||
<?xml version="1.0" encoding="UTF-8"?> | |||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" | |||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |||
<modelVersion>4.0.0</modelVersion> | |||
<parent> | |||
@@ -55,7 +56,6 @@ | |||
<scope>import</scope> | |||
</dependency> | |||
</dependencies> | |||
</dependencyManagement> | |||
<build> | |||
@@ -18,6 +18,7 @@ import work.xuye.common.enums.MessageType; | |||
import work.xuye.common.enums.Status; | |||
import work.xuye.common.properties.AlertProperties; | |||
import work.xuye.common.service.MessageService; | |||
import work.xuye.common.utils.DebugUtil; | |||
import work.xuye.sink.service.JdbcService; | |||
import work.xuye.sink.service.SqlExecutor; | |||
@@ -48,6 +49,9 @@ public class DisappearHandler { | |||
Integer taskId = (Integer) message.getHeaders().get(MessageConstants.TASK_ID); | |||
TaskVO taskVO = taskManager.getTaskInfoByTaskId(taskId); | |||
if (DebugUtil.isSkip(taskVO)) { | |||
return; | |||
} | |||
Task task = taskVO.getTask(); | |||
String taskName = task.getName(); | |||
SinkParams.CheckDelete delete = task.getSinkParams().getCheckDelete(); | |||
@@ -108,13 +112,13 @@ public class DisappearHandler { | |||
String url = alertProperties.getDomainName().getSink() + "/jdbc/execute/" + sqlEntity.getId() + "/" + sqlEntity.getCaptcha(); | |||
// 如果已经自动删除了,就发送自动删除结果 | |||
subTitle = "待手动删除条数: " + removed.size(); | |||
if (autoDeleted) { | |||
subTitle = "已自动删除条数: " + removed.size(); | |||
fullMessage.append("> ✅[查看删除结果](").append(url).append(")"); | |||
} | |||
// 如果未自动删除,并且还配置开启了手动删除的入口,就拼接手动删除的入口 | |||
else if (disappearConfig.isSendExecuteSQLUrl()) { | |||
subTitle = "待删除条数: " + removed.size(); | |||
fullMessage.append("> ⚠️[点击确认删除](").append(url).append(")"); | |||
} | |||
@@ -15,13 +15,11 @@ import work.xuye.common.db.entity.vo.SinkParams; | |||
import work.xuye.common.db.entity.vo.TableTemplate; | |||
import work.xuye.common.db.service.TaskManager; | |||
import work.xuye.common.dto.TaskVO; | |||
import work.xuye.common.enums.ProcessMode; | |||
import work.xuye.common.enums.ProcessReason; | |||
import work.xuye.common.enums.ResourceStatus; | |||
import work.xuye.common.enums.Status; | |||
import work.xuye.common.enums.*; | |||
import work.xuye.common.service.UrlMD5Service; | |||
import work.xuye.common.spel.CustomFunction; | |||
import work.xuye.common.spel.SpringExpressionLanguageEvaluator; | |||
import work.xuye.common.utils.DebugUtil; | |||
import work.xuye.sink.service.JdbcService; | |||
import java.util.ArrayList; | |||
@@ -76,9 +74,13 @@ public class ItemHandler { | |||
return; | |||
} | |||
TaskVO taskVO = taskManager.getTaskInfoByTaskId((Integer) message.getHeaders().get(MessageConstants.TASK_ID)); | |||
if (DebugUtil.isSkip(taskVO)) { | |||
return; | |||
} | |||
//相关字段放这里 | |||
String md5Digest = DigestUtils.md5DigestAsHex(message.getPayload().getBytes()); | |||
TableTemplate template = taskVO.getTemplate().getTableTemplate(); | |||
SinkParams.CheckDelete checkDeleteConfig = taskVO.getTask().getSinkParams().getCheckDelete(); | |||
// 根据消息体,和映射关系,使用SpEL表达式,得到计算结果 | |||
HashMap<String, Object> expressionResultMap = evaluator.evaluate(taskVO.getMapping().getTableMapping(), message); | |||
// 如果不能通过校验,则抛出异常 | |||
@@ -105,18 +107,105 @@ public class ItemHandler { | |||
} | |||
//判断数据是否存在 | |||
boolean itemExist = this.itemExist(taskVO.getTask().getSinkParams(), template); | |||
//如果已经存在了,就判断是否需要更新,如果需要更新就更新 | |||
//如果已经存在了,就判断是否需要删除或更新 | |||
if (itemExist) { | |||
this.tryUpdate(taskVO, expressionResultMap, md5Digest, template, reason); | |||
// 先去尝试删除 | |||
boolean deleted = this.tryDelete(taskVO, template, expressionResultMap, checkDeleteConfig, reason); | |||
// 如果将已存在的数据删除了,就可以直接放入缓存然后结束了 | |||
if (deleted) { | |||
this.putToCache(template, md5Digest); | |||
return; | |||
} | |||
Map<String, Object> dbItem = this.getExistedItem(taskVO, template); | |||
this.tryUpdate(taskVO, dbItem, expressionResultMap, md5Digest, template, reason); | |||
//如果不存在,就插入 | |||
} else { | |||
this.tryInsert(taskVO.getTask(), template, reason); | |||
this.tryInsert(taskVO.getTask(), expressionResultMap, template, reason); | |||
} | |||
this.putToCache(template, md5Digest); | |||
} | |||
/** | |||
* 是否满足一组断言规则 | |||
* | |||
* @param task 任务 | |||
* @param map map结构的数据 | |||
* @param template 模板 | |||
* @param predicates 断言规则 | |||
* @return 是否满足 | |||
*/ | |||
private boolean isSatisfied(Task task, Map<String, Object> map, TableTemplate template, List<String> predicates, String description) { | |||
if (ObjectUtils.isEmpty(predicates)) { | |||
return false; | |||
} | |||
boolean isValid = true; | |||
TableTemplate.Field uniqueField = template.getUniqueField(); | |||
String taskName = task.getName(); | |||
for (String expr : predicates) { | |||
boolean valid = (Boolean) evaluator.evaluate(map, expr, Boolean.class); | |||
if (!valid) { | |||
log.info("[{}][ℹ️️ {}- 断言未通过 ]" + "[{}] {}: {}", | |||
taskName, | |||
description, | |||
expr, | |||
uniqueField.getFieldName(), | |||
uniqueField.getValue()); | |||
isValid = false; | |||
break; | |||
} | |||
} | |||
return isValid; | |||
} | |||
private void putToCache(TableTemplate template, String md5Digest) { | |||
// 必须在方法出栈前,将md5放入缓存,因为Spring Cloud Stream默认会重试三次,如果提前放入缓存了,会导致首次重试被判定为无需处理 | |||
urlMD5Service.put(template.getUniqueField().getValue().toString(), md5Digest); | |||
} | |||
private boolean tryDelete(TaskVO taskVO, TableTemplate template, HashMap<String, Object> newItem, SinkParams.CheckDelete checkDeleteConfig, ProcessReason reason) { | |||
// 如果不是 是固定值检测删除并且状态是开启,说明无需处理,直接返回 | |||
if (!(checkDeleteConfig.getMode().equals(DeleteCheckMode.fixedValue) && checkDeleteConfig.getStatus().equals(Status.on))) { | |||
return false; | |||
} | |||
SinkParams.FixedValueConfig fixedValueConfig = checkDeleteConfig.getFixedValueConfig(); | |||
List<String> predicates = fixedValueConfig.getPredicates(); | |||
// 满足断言条件,才能继续处理 | |||
boolean satisfied = this.isSatisfied(taskVO.getTask(), newItem, template, predicates, "删除"); | |||
if (satisfied) { | |||
return doDelete(taskVO, template.getUniqueField(), reason); | |||
} | |||
return false; | |||
} | |||
private boolean doDelete(TaskVO taskVO, TableTemplate.Field uniqueField, ProcessReason reason) { | |||
String tableName = taskVO.getTask().getSinkParams().getTableName(); | |||
Object value = uniqueField.getValue(); | |||
TableTemplate template = taskVO.getTemplate().getTableTemplate(); | |||
TableTemplate.LogicDeleteField logicDeleteField = template.getLogicDeleteField(); | |||
boolean isLogicDelete = !ObjectUtils.isEmpty(logicDeleteField); | |||
String deleteSql; | |||
if (!isLogicDelete) { | |||
deleteSql = SqlGenerator.generateDeleteSql(tableName, uniqueField.getFieldName(), uniqueField.getValue()); | |||
} else { | |||
deleteSql = SqlGenerator.generateLogicDeleteUpdateSql(tableName, template); | |||
} | |||
int rows = jdbcService.executeSql(taskVO.getTask().getSinkParams().getDataSourceName(), deleteSql); | |||
log.info("[{}][\uD83D\uDDD1️已{}删除{}条][{} {}] " + "{}: {}", | |||
taskVO.getTask().getName(), | |||
isLogicDelete ? "逻辑" : "", | |||
rows, | |||
reason.getEmoji(), | |||
reason.name(), | |||
uniqueField.getFieldName(), | |||
value); | |||
if (rows != 1) { | |||
throw new RuntimeException("期望删除一条,实际删除" + rows + "条数据,请尽快处理"); | |||
} | |||
return true; | |||
} | |||
private boolean isTemplateValid(String taskName, TableTemplate template) { | |||
boolean isValid = true; | |||
List<TableTemplate.Field> allField = new ArrayList<>(); | |||
@@ -132,7 +221,7 @@ public class ItemHandler { | |||
for (String rule : rules) { | |||
boolean valid = (Boolean) evaluator.evaluate(field.getValue(), rule, Boolean.class); | |||
if (!valid) { | |||
log.warn("[{}][❌️ 规则校验未通过]" + "[{}] {}: {}", | |||
log.warn("[{}][❌️ 模板规则校验未通过]" + "[{}] {}: {}", | |||
taskName, | |||
rule, | |||
field.getFieldName(), | |||
@@ -201,23 +290,25 @@ public class ItemHandler { | |||
return jdbcService.isExist(sinkParams.getDataSourceName(), selectExist); | |||
} | |||
private Map<String, Object> getExistedItem(TaskVO taskVO, TableTemplate template) { | |||
String dataSourceName = taskVO.getTask().getSinkParams().getDataSourceName(); | |||
return jdbcService.getOne(dataSourceName, SqlGenerator.generateFetchSql(template)); | |||
} | |||
private void tryUpdate(TaskVO taskVO, HashMap<String, Object> resultMap, String md5Digest, TableTemplate tableTemplate, ProcessReason reason) { | |||
private void tryUpdate(TaskVO taskVO, Map<String, Object> dbItem, HashMap<String, Object> resultMap, String md5Digest, TableTemplate tableTemplate, ProcessReason reason) { | |||
SinkParams sinkParams = taskVO.getTask().getSinkParams(); | |||
String taskName = taskVO.getTask().getName(); | |||
boolean updateConfigEnabled = sinkParams.getCheckUpdate().getStatus().equals(Status.on); | |||
if (updateConfigEnabled) { | |||
String dataSourceName = sinkParams.getDataSourceName(); | |||
Assert.notNull(dataSourceName, "dataSourceName is null"); | |||
Map<String, Object> item = jdbcService.getOne(dataSourceName, SqlGenerator.generateFetchSql(tableTemplate)); | |||
SinkParams.CheckUpdate checkUpdateConfig = sinkParams.getCheckUpdate(); | |||
String fieldName = checkUpdateConfig.getFieldName(); | |||
Object dbValue = item.get(fieldName); | |||
Object dbValue = dbItem.get(fieldName); | |||
if (ObjectUtils.isEmpty(dbValue)) { | |||
throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是数据库不存在该字段"); | |||
} | |||
Object nowValue = resultMap.get(fieldName); | |||
if (!ObjectUtils.isEmpty(checkUpdateConfig.getJsonPath())) { | |||
dbValue = cf.jsonExtract(dbValue.toString(), checkUpdateConfig.getJsonPath()); | |||
@@ -245,7 +336,6 @@ public class ItemHandler { | |||
tableTemplate.getUniqueField().getValue()); | |||
} | |||
} else { | |||
log.info("[{}][⏭️ 不检查更新][{} {}] " + "{}: {}", | |||
taskName, | |||
reason.getEmoji(), | |||
@@ -256,15 +346,16 @@ public class ItemHandler { | |||
} | |||
private void tryInsert(Task task, TableTemplate template, ProcessReason reason) { | |||
jdbcService.update(task.getSinkParams().getDataSourceName(), SqlGenerator.generateInsertSql(template)); | |||
log.info("[{}][\uD83D\uDCBE 已保存][{} {}] " + "{}: {}", | |||
task.getName(), | |||
reason.getEmoji(), | |||
reason.name(), | |||
template.getUniqueField().getFieldName(), | |||
template.getUniqueField().getValue()); | |||
private void tryInsert(Task task, Map<String, Object> newItem, TableTemplate template, ProcessReason reason) { | |||
if (this.isSatisfied(task, newItem, template, task.getSinkParams().getInsertConfig().getPredicates(), "保存")) { | |||
jdbcService.update(task.getSinkParams().getDataSourceName(), SqlGenerator.generateInsertSql(template)); | |||
log.info("[{}][\uD83D\uDCBE 已保存][{} {}] " + "{}: {}", | |||
task.getName(), | |||
reason.getEmoji(), | |||
reason.name(), | |||
template.getUniqueField().getFieldName(), | |||
template.getUniqueField().getValue()); | |||
} | |||
} | |||
} |
@@ -51,7 +51,7 @@ public class SqlGenerator { | |||
sqlBuilder.append(", `").append(updateTimeField.getFieldName()).append("`=") | |||
.append(generateCurrentTimeFunctionString(updateTimeField.getFieldType())); | |||
} | |||
sqlBuilder.append(" WHERE ") | |||
.append('`').append(template.getUniqueField().getFieldName()).append('`') | |||
.append('=').append(formatFieldValue(template.getUniqueField())); | |||
@@ -195,14 +195,14 @@ public class SqlGenerator { | |||
return "SELECT `" + selectField + "` FROM `" + tableName + "` WHERE " + | |||
condition + '\'' + lastValue + '\''; | |||
} | |||
public static String generateGivenFieldSql(String tableName, String condition, String lastValue) { | |||
return "SELECT * FROM `" + tableName + "` WHERE " + | |||
condition + '\'' + lastValue + '\''; | |||
} | |||
public static String generateDeleteSql(String tableName, String selectField, String value) { | |||
return "DELETE FROM " + tableName + " WHERE " + selectField + "='" + value + "'" + " LIMIT 1;"; | |||
public static String generateDeleteSql(String tableName, String field, Object value) { | |||
return "DELETE FROM " + tableName + " WHERE " + field + "='" + value + "'" + " LIMIT 1;"; | |||
} | |||
public static List<String> generateDeleteSqls(String tableName, String fieldName, List<String> dbs) { | |||
@@ -212,4 +212,34 @@ public class SqlGenerator { | |||
} | |||
return result; | |||
} | |||
public static String generateLogicDeleteUpdateSql(String tableName, TableTemplate template) { | |||
TableTemplate.Field updateTimeField = template.getUpdateTimeField(); | |||
TableTemplate.LogicDeleteField logicDeleteField = template.getLogicDeleteField(); | |||
String logicDeleteFieldName = logicDeleteField.getFieldName(); | |||
String logicDeleteValue = logicDeleteField.getDeleteValue(); | |||
TableTemplate.Field uniqueField = template.getUniqueField(); | |||
String uniqueFieldFieldName = uniqueField.getFieldName(); | |||
Object uniqueFieldValue = uniqueField.getValue(); | |||
TableTemplate.Field updateField = template.getUpdateField(); | |||
StringBuilder sqlBuilder = new StringBuilder(); | |||
sqlBuilder.append("UPDATE `").append(tableName).append("` SET "); | |||
sqlBuilder.append('`').append(logicDeleteFieldName).append('`') | |||
.append('=').append(logicDeleteValue); | |||
sqlBuilder.append(", `").append(updateField.getFieldName()).append("`=") | |||
.append(updateField.getValue()); | |||
if (updateTimeField != null) { | |||
sqlBuilder.append(", `").append(updateTimeField.getFieldName()).append("`=") | |||
.append(generateCurrentTimeFunctionString(updateTimeField.getFieldType())); | |||
} | |||
sqlBuilder.append(" WHERE ") | |||
.append('`').append(uniqueFieldFieldName).append('`') | |||
.append('=').append('\'').append(uniqueFieldValue.toString()) | |||
.append("' LIMIT 1"); | |||
return sqlBuilder.toString(); | |||
} | |||
} |
@@ -69,9 +69,24 @@ public class JdbcService { | |||
} | |||
/** | |||
* 查询一条数据,如果查询到了多条,抛出异常;如果没有查询到,返回null | |||
* | |||
* @param datasourceName 数据源名称 | |||
* @param sql sql | |||
* @return 查询结果,如果没有查询到,返回null | |||
*/ | |||
public Map<String, Object> getOne(String datasourceName, String sql) { | |||
JdbcTemplate jdbcTemplate = this.getJdbcTemplate(datasourceName); | |||
return jdbcTemplate.queryForMap(sql); | |||
List<Map<String, Object>> maps = jdbcTemplate.queryForList(sql); | |||
if (ObjectUtils.isEmpty(maps)) { | |||
return null; | |||
} | |||
if (maps.size() > 1) { | |||
log.error("期望查询出一条,实际查询出结果为{}条, sql:{}, result:{}", maps.size(), sql, maps); | |||
throw new RuntimeException("期望查询出一条,实际查询出结果为" + maps.size() + "条"); | |||
} | |||
return maps.get(0); | |||
} | |||
@@ -106,7 +121,7 @@ public class JdbcService { | |||
public int executeSql(String datasource, String sql) { | |||
JdbcTemplate jdbcTemplate = this.getJdbcTemplate(datasource); | |||
int update = jdbcTemplate.update(sql); | |||
log.info("execute sql, datasource:{}, sql:{}, result:{}", datasource, sql, update); | |||
log.info("execute sql, datasource:[{}], sql:[{}], result:[{}]", datasource, sql, update); | |||
return update; | |||
} | |||
@@ -20,6 +20,7 @@ import work.xuye.common.dto.TaskVO; | |||
import work.xuye.common.enums.ProcessMode; | |||
import work.xuye.common.enums.ResourceStatus; | |||
import work.xuye.common.service.UrlMD5Service; | |||
import work.xuye.common.utils.DebugUtil; | |||
import work.xuye.common.utils.ID; | |||
import work.xuye.source.request.RequestClient; | |||
@@ -44,6 +45,9 @@ public class SourceHandler { | |||
public Message<HttpRes> handle(Message<HttpRequestParams> message) { | |||
TaskVO taskVO = taskManager.getTaskInfoByTaskId((Integer) message.getHeaders().get(MessageConstants.TASK_ID)); | |||
if (DebugUtil.isSkip(taskVO)) { | |||
return null; | |||
} | |||
HttpRes res = null; | |||
HttpRequestParams requestParams = message.getPayload(); | |||
StopWatch stopWatch = new StopWatch(); | |||
@@ -56,16 +60,20 @@ public class SourceHandler { | |||
urlMD5Service.put(requestParams.getUrl(), md5Digest); | |||
res.setResourceStatus(resourceStatus); | |||
} catch (Exception e) { | |||
log.error("request error,request params:{}, error:{}", requestParams, e); | |||
log.error("[{}] request error, request params:[{}], error message:[{}]", taskVO.getTask().getName(), requestParams, e.getMessage()); | |||
return null; | |||
} | |||
if (!res.getStatus().is2xxSuccessful()) { | |||
log.warn("[{}] response status code is not 2xx, request params: [{}], response: [{}]", taskVO.getTask().getName(), requestParams, res); | |||
throw new RuntimeException("[" + taskVO.getTask().getName() + "]" + "unexpected status code: " + res.getStatus()); | |||
} | |||
ProcessMode processMode = taskManager | |||
.getTaskInfoByTaskId( | |||
(Integer) message.getHeaders().get(MessageConstants.TASK_ID)) | |||
.getTask() | |||
.getProcessMode(); | |||
this.log(message, res, processMode, stopWatch.getLastTaskTimeMillis(), taskVO.getTask()); | |||
if (res.getResourceStatus().equals(ResourceStatus.UNCHANGED) && ProcessMode.NORMAL.equals(processMode)) { | |||
if (res.getResourceStatus().equals(ResourceStatus.UNCHANGED) && !ProcessMode.DEBUG.equals(processMode)) { | |||
return null; | |||
} | |||
return MessageBuilder | |||
@@ -81,8 +89,14 @@ public class SourceHandler { | |||
private void log(Message<HttpRequestParams> message, HttpRes res, ProcessMode processMode, long ms, Task task) { | |||
HttpRequestParams req = message.getPayload(); | |||
String emoji = processMode.equals(ProcessMode.NORMAL) ? "" : "🤖"; | |||
String emoji = ""; | |||
if (processMode.equals(ProcessMode.NORMAL)) { | |||
emoji = ""; | |||
} else if (processMode.equals(ProcessMode.DEBUG)) { | |||
emoji = "🤖"; | |||
} else if (processMode.equals(ProcessMode.WATCH)) { | |||
emoji = "👀"; | |||
} | |||
if (res.getStatus().is2xxSuccessful()) { | |||
log.info("[{}-{}] [{}{} {}] [{} {} {}ms] {} ", task.getId(), task.getName(), | |||
res.getResourceStatus().getEmoji(), emoji, | |||
@@ -12,13 +12,16 @@ import org.springframework.util.ObjectUtils; | |||
import work.xuye.common.constant.BindingConstants; | |||
import work.xuye.common.constant.MessageConstants; | |||
import work.xuye.common.constant.StageConstants; | |||
import work.xuye.common.db.entity.Task; | |||
import work.xuye.common.db.entity.vo.ParseParams; | |||
import work.xuye.common.db.entity.vo.SinkParams; | |||
import work.xuye.common.db.entity.vo.TransformParams; | |||
import work.xuye.common.db.service.TaskManager; | |||
import work.xuye.common.dto.HttpRes; | |||
import work.xuye.common.dto.TaskVO; | |||
import work.xuye.common.enums.*; | |||
import work.xuye.common.service.UrlMD5Service; | |||
import work.xuye.common.utils.DebugUtil; | |||
import work.xuye.common.utils.ID; | |||
import work.xuye.common.utils.JsonPathUtil; | |||
import work.xuye.transformer.transformer.MessageTransformer; | |||
@@ -45,7 +48,6 @@ public class TransformHandler { | |||
public List<Message<String>> handle(Message<HttpRes> message) { | |||
try { | |||
return this.doHandle(message); | |||
} catch (Exception e) { | |||
@@ -58,10 +60,12 @@ public class TransformHandler { | |||
public List<Message<String>> doHandle(Message<HttpRes> message) { | |||
// 获取任务信息 | |||
TaskVO taskVO = taskManager.getTaskInfoByTaskId((Integer) message.getHeaders().get(MessageConstants.TASK_ID)); | |||
if (DebugUtil.isSkip(taskVO)) { | |||
return null; | |||
} | |||
ProcessReason processReason = processReason(message, taskVO); | |||
// 如果资源状态是未变化,直接结束执行 | |||
if (ProcessReason.unchanged.equals(processReason)) { | |||
if (ProcessReason.unchanged.equals(processReason) || ProcessReason.end.equals(processReason)) { | |||
// todo 考虑是否有必要加日志 | |||
return null; | |||
} | |||
@@ -79,6 +83,10 @@ public class TransformHandler { | |||
private ProcessReason processReason(Message<HttpRes> message, TaskVO taskVO) { | |||
HttpRes httpRes = message.getPayload(); | |||
ProcessMode processMode = taskVO.getTask().getProcessMode(); | |||
if (ProcessMode.WATCH.equals(processMode)) { | |||
log.debug("@@ 此任务为监听模式,不会进行任何处理"); | |||
return ProcessReason.end; | |||
} | |||
// 理论上不会出现这种情况,因为符合这个条件的消息source是不会下发的,如果出现了,说明是消息被人为重发了 | |||
if (ResourceStatus.UNCHANGED.equals(httpRes.getResourceStatus()) && ProcessMode.NORMAL.equals(processMode)) { | |||
@@ -102,7 +110,16 @@ public class TransformHandler { | |||
private void transformResult(Message<HttpRes> message, TaskVO taskVO) { | |||
String result = message.getPayload().getBody(); | |||
List<String> handlers = taskVO.getTask().getTransformParams().getHandlers(); | |||
Task task = taskVO.getTask(); | |||
TransformParams transformParams = task.getTransformParams(); | |||
if (ObjectUtils.isEmpty(transformParams)) { | |||
if (task.getProcessMode().getMode().equals(ProcessMode.WATCH.getMode())) { | |||
return; | |||
} else { | |||
throw new RuntimeException(task.getName() + " transformParams is null"); | |||
} | |||
} | |||
List<String> handlers = transformParams.getHandlers(); | |||
Assert.notNull(handlers, "handlers is null"); | |||
String seedUrl = this.getSeedUrl(message); | |||
@@ -172,23 +189,10 @@ public class TransformHandler { | |||
results.add(itemMessage); | |||
}); | |||
if (results.size() > 0) { | |||
log.info("@ [{}{}] 下发转换后的消息,数量:{}", processReason.getEmoji(), processReason, results.size()); | |||
log.info("@ [{}][{}{}] 下发转换后的消息,数量:{}", taskVO.getTask().getName(), processReason.getEmoji(), processReason, results.size()); | |||
} | |||
// 如果没有items的jsonpath,说明只做监控,后续不做入库处理 | |||
} else { | |||
Message<String> itemMessage = MessageBuilder.withPayload(result) | |||
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.END) | |||
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) | |||
.setHeader(MessageConstants.SOURCE_TRACE_ID, message.getHeaders().get(MessageConstants.SOURCE_TRACE_ID)) | |||
.setHeader(MessageConstants.TRANSFORMER_TRACE_ID, transformerTraceId) | |||
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) | |||
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME)) | |||
.setHeader(MessageConstants.SEED_URL, message.getHeaders().get(MessageConstants.SEED_URL)) | |||
.build(); | |||
results.add(itemMessage); | |||
log.info("@ 该消息未进行转换,直接下发,后续无需处理," + MessageConstants.TRANSFORMER_TRACE_ID + ": [{}]", | |||
transformerTraceId); | |||
log.warn("@@ 任务[{}]没有配置itemsPath", taskVO.getTask().getName()); | |||
} | |||
return results; | |||
} | |||
@@ -14,14 +14,20 @@ import org.springframework.stereotype.Component; | |||
@Component("resData") | |||
@RequiredArgsConstructor | |||
public class ResDataTransformer implements MessageTransformer { | |||
private int threshold = 300; | |||
@Override | |||
public String transform(String json, String seedUrl) { | |||
JsonObject res = JsonParser.parseString(json).getAsJsonObject(); | |||
boolean hasData = res.has("data"); | |||
if (!hasData) { | |||
throw new RuntimeException("res no data"); | |||
if (hasData) { | |||
threshold = 300; | |||
} else { | |||
threshold--; | |||
} | |||
if (threshold <= 0) { | |||
throw new RuntimeException("res no data, seedUrl: " + seedUrl); | |||
} | |||
return res.get("data").getAsString(); | |||
} | |||