From b8f93937f28c5a60e89490b2d3b677a7225ce41b Mon Sep 17 00:00:00 2001 From: xuye Date: Fri, 14 Apr 2023 22:12:55 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=9D=E5=AD=98=E3=80=81=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E6=96=AD=E8=A8=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 5 + common/pom.xml | 9 +- .../main/java/work/xuye/common/db/entity/Task.java | 3 +- .../work/xuye/common/db/entity/vo/SinkParams.java | 22 +++- .../xuye/common/db/entity/vo/TableTemplate.java | 8 ++ .../work/xuye/common/enums/DeleteCheckMode.java | 9 +- .../java/work/xuye/common/enums/ProcessMode.java | 18 ++- .../java/work/xuye/common/enums/ProcessReason.java | 5 + .../java/work/xuye/common/spel/CustomFunction.java | 62 ++++++++++ .../spel/SpringExpressionLanguageEvaluator.java | 3 +- .../java/work/xuye/common/utils/DebugUtil.java | 27 ++++ common/src/main/resources/markdown/analizer.md | 13 +- common/src/main/resources/markdown/deploy.md | 26 ++++ common/src/main/resources/markdown/readme.md | 4 - common/src/main/resources/markdown/sql.md | 50 +++++--- common/src/main/resources/markdown/task.md | 4 + pom.xml | 4 +- .../work/xuye/sink/handler/DisappearHandler.java | 6 +- .../java/work/xuye/sink/handler/ItemHandler.java | 137 +++++++++++++++++---- .../java/work/xuye/sink/handler/SqlGenerator.java | 38 +++++- .../java/work/xuye/sink/service/JdbcService.java | 19 ++- .../work/xuye/source/handler/SourceHandler.java | 22 +++- .../xuye/transformer/handler/TransformHandler.java | 42 ++++--- .../transformer/ResDataTransformer.java | 12 +- 24 files changed, 460 insertions(+), 88 deletions(-) create mode 100644 README.md create mode 100644 common/src/main/java/work/xuye/common/utils/DebugUtil.java diff --git a/README.md b/README.md new file mode 100644 index 0000000..e4fad00 --- /dev/null +++ b/README.md @@ -0,0 +1,5 @@ +## [接口文档](https://snp-scheduler.deepq.tech/) + +## [任务描述](https://snp-scheduler.deepq.tech/doc.html#/default-mdtagaf78a-omd/document-dc235ffa1acda5f62eb586372d3bb7e5) + +## [当日爬取可视化(生产环境)](https://snp-scheduler.deepq.tech/doc.html#/default-mdtagaf78a-omd/document-d7656a65244e1fa48671f8f71422550e) diff --git a/common/pom.xml b/common/pom.xml index 9ca0c89..0101c48 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 work.xuye @@ -109,6 +110,12 @@ io.micrometer micrometer-registry-prometheus + + org.springframework.boot + spring-boot-devtools + runtime + true + diff --git a/common/src/main/java/work/xuye/common/db/entity/Task.java b/common/src/main/java/work/xuye/common/db/entity/Task.java index 00357d9..a9d37b0 100644 --- a/common/src/main/java/work/xuye/common/db/entity/Task.java +++ b/common/src/main/java/work/xuye/common/db/entity/Task.java @@ -44,8 +44,7 @@ public class Task implements Serializable { private String description; @TableField("process_mode") - private ProcessMode processMode; - + private ProcessMode processMode = ProcessMode.NORMAL; @TableField(value = "request_params", typeHandler = JacksonTypeHandler.class) private HttpRequestParams requestParams; diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java b/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java index 8f6a3ee..70c8f87 100644 --- a/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java +++ b/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java @@ -6,6 +6,8 @@ import lombok.NoArgsConstructor; import work.xuye.common.enums.DeleteCheckMode; import work.xuye.common.enums.Status; +import java.util.List; + /** * @author xuye * @since 2023/2/17 23:41 @@ -37,10 +39,16 @@ public class SinkParams { */ private String tableName; + private InsertConfig insertConfig; + + @Data + public static class InsertConfig { + private List predicates; + } + @Data public static class CheckUpdate { private Status status; - private String fieldName; private String jsonPath; } @@ -50,8 +58,20 @@ public class SinkParams { private Status status; private DeleteCheckMode mode; private DisappearConfig disappearConfig; + private FixedValueConfig fixedValueConfig; + } + + @Data + public static class FixedValueConfig { + + /** + * 满足断言条件的数据才会被处理 + */ + private List predicates; + } + @Data public static class DisappearConfig { /** diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java b/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java index ce16277..7c20924 100644 --- a/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java +++ b/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java @@ -42,6 +42,11 @@ public class TableTemplate { private Field updateTimeField; /** + * 如果发生 更新/删除 操作,需要更新该字段的值 + */ + private Field updateField; + + /** * 其他字段,指的是普通字段 */ private List otherFields; @@ -81,6 +86,9 @@ public class TableTemplate { @EqualsAndHashCode(callSuper = true) public static class LogicDeleteField extends Field { + /** + * 标识逻辑删除时,需要设置的值 + */ private String deleteValue; } diff --git a/common/src/main/java/work/xuye/common/enums/DeleteCheckMode.java b/common/src/main/java/work/xuye/common/enums/DeleteCheckMode.java index e1f1eb7..e6ece5a 100644 --- a/common/src/main/java/work/xuye/common/enums/DeleteCheckMode.java +++ b/common/src/main/java/work/xuye/common/enums/DeleteCheckMode.java @@ -5,9 +5,14 @@ package work.xuye.common.enums; * @since 2023/3/9 16:24 **/ public enum DeleteCheckMode { - + /** * 通过数据消失来判断 */ - disappear + disappear, + + /** + * 通过指定字段的值来判断 + */ + fixedValue } diff --git a/common/src/main/java/work/xuye/common/enums/ProcessMode.java b/common/src/main/java/work/xuye/common/enums/ProcessMode.java index e0e5e66..c12ee50 100644 --- a/common/src/main/java/work/xuye/common/enums/ProcessMode.java +++ b/common/src/main/java/work/xuye/common/enums/ProcessMode.java @@ -14,7 +14,23 @@ import com.baomidou.mybatisplus.annotation.EnumValue; **/ public enum ProcessMode { - DEBUG("debug"), NORMAL("normal"); + + /** + * debug模式,不会使用缓存判断是否已经处理过了 + */ + DEBUG("debug"), + + + /** + * 常规模式,调试完毕线上使用 + */ + NORMAL("normal"), + + + /** + * 监听模式,只爬取并且监听每次爬取的结果,不会进行任何处理 + */ + WATCH("watch"); @EnumValue private final String mode; diff --git a/common/src/main/java/work/xuye/common/enums/ProcessReason.java b/common/src/main/java/work/xuye/common/enums/ProcessReason.java index f0f179b..b6189e8 100644 --- a/common/src/main/java/work/xuye/common/enums/ProcessReason.java +++ b/common/src/main/java/work/xuye/common/enums/ProcessReason.java @@ -25,6 +25,11 @@ public enum ProcessReason { */ resource_changed("\uD83D\uDD04", "缓存中存在该记录,但与最新状态不一致"), + /** + * 结束执行 + */ + end("\uD83D\uDD1A", "结束执行"), + other("⚠️", "不应该出现的状态,如果出现了,请尽快排查"); @Getter diff --git a/common/src/main/java/work/xuye/common/spel/CustomFunction.java b/common/src/main/java/work/xuye/common/spel/CustomFunction.java index bb61077..d210303 100644 --- a/common/src/main/java/work/xuye/common/spel/CustomFunction.java +++ b/common/src/main/java/work/xuye/common/spel/CustomFunction.java @@ -1,5 +1,6 @@ package work.xuye.common.spel; +import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; @@ -18,6 +19,7 @@ import work.xuye.common.utils.HttpUtil; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; +import java.util.Map; /** * @author xuye @@ -195,4 +197,64 @@ public class CustomFunction { } + /** + * 先从map中获取指定的key,然后使用jsonpath提取json中的数据,判断是否等于期望值 + * + * @param map map + * @param mapFieldKey map中的key + * @param jsonPath jsonpath + * @param value 期望值 + * @return 是否相等 + */ + public boolean equals(Map map, String mapFieldKey, String jsonPath, Object value) { + Object fieldValue = map.get(mapFieldKey); + log.debug("equals handler2, map: [{}], mapFieldKey: [{}], mapValue: [{}]", map, mapFieldKey, value); + String result = this.jsonExtract(fieldValue.toString(), jsonPath); + boolean equals = result.equals(value); + log.debug("equals handler1, json: [{}], jsonPath: [{}], value: [{}], result: [{}]", fieldValue, jsonPath, value, equals); + return equals; + } + + /** + * 固定返回 true,用于希望断言通过时使用 + * + * @return true + */ + public boolean returnTrue() { + return true; + } + + + /** + * 固定返回 false,用于希望断言不通过时使用 + * + * @return false + */ + public boolean returnFalse() { + return false; + } + + /** + * 原始栏目code递增映射 + * + * @param json json字符串 + * @param jsonPath jsonpath + * @return 原始栏目JSON结果 + */ + public synchronized String mappingColumns(String json, String jsonPath) { + int start = 0; + String columns = this.jsonExtract(json, jsonPath); + String[] columnList = columns.split(","); + JsonArray array = new JsonArray(); + for (String column : columnList) { + JsonObject obj = new JsonObject(); + obj.addProperty("code", start++); + obj.addProperty("name", column); + array.add(obj); + } + log.info("incrColumns handler, jsonPath: [{}], columns: [{}], result: [{}]", jsonPath, columns, array); + return array.toString(); + } + + } diff --git a/common/src/main/java/work/xuye/common/spel/SpringExpressionLanguageEvaluator.java b/common/src/main/java/work/xuye/common/spel/SpringExpressionLanguageEvaluator.java index b614232..5b3200f 100644 --- a/common/src/main/java/work/xuye/common/spel/SpringExpressionLanguageEvaluator.java +++ b/common/src/main/java/work/xuye/common/spel/SpringExpressionLanguageEvaluator.java @@ -22,8 +22,7 @@ import java.util.List; @Service @RequiredArgsConstructor public class SpringExpressionLanguageEvaluator { - - + public final CustomFunction cf; private final SpelExpressionParser parser; diff --git a/common/src/main/java/work/xuye/common/utils/DebugUtil.java b/common/src/main/java/work/xuye/common/utils/DebugUtil.java new file mode 100644 index 0000000..b4de59e --- /dev/null +++ b/common/src/main/java/work/xuye/common/utils/DebugUtil.java @@ -0,0 +1,27 @@ +package work.xuye.common.utils; + +import lombok.extern.slf4j.Slf4j; +import work.xuye.common.dto.TaskVO; + +import java.util.Set; + +/** + * @author xuye + * @since 2023/5/6 10:31 + **/ +@Slf4j +public class DebugUtil { + + + private static final Set skipTaskIds = Set.of(); + + public static boolean isSkip(TaskVO task) { + Integer taskId = task.getTask().getId(); + if (skipTaskIds.contains(taskId)) { + log.warn("skip task, [{}-{}]", taskId, task.getTask().getName()); + return true; + } + return false; + } + +} diff --git a/common/src/main/resources/markdown/analizer.md b/common/src/main/resources/markdown/analizer.md index 72187c1..da7a85d 100644 --- a/common/src/main/resources/markdown/analizer.md +++ b/common/src/main/resources/markdown/analizer.md @@ -2,4 +2,15 @@
-
\ No newline at end of file + + +# 堆积监控 + +## [grafana](https://grafana-prd.deepq.tech/goto/zWtaW4s4k) + +- ### [source](https://snp-source.deepq.tech/actuator/metrics/spring.cloud.stream.binder.kafka.offset) + +- ### [transformer](https://snp-transformer.deepq.tech/actuator/metrics/spring.cloud.stream.binder.kafka.offset) + +- ### [sink](https://snp-sink.deepq.tech/actuator/metrics/spring.cloud.stream.binder.kafka.offset) + diff --git a/common/src/main/resources/markdown/deploy.md b/common/src/main/resources/markdown/deploy.md index 50ffa1c..9908258 100644 --- a/common/src/main/resources/markdown/deploy.md +++ b/common/src/main/resources/markdown/deploy.md @@ -31,6 +31,24 @@ ./kafka-topics.sh --bootstrap-server kafka-0.kafka.common.svc.cluster.local:9092,kafka-1.kafka.common.svc.cluster.local:9092,kafka-2.kafka.common.svc.cluster.local:9092 --describe --topic source-out ``` +调整分区 + +```shell +./kafka-topics.sh --bootstrap-server kafka-0.kafka.common.svc.cluster.local:9092,kafka-1.kafka.common.svc.cluster.local:9092,kafka-2.kafka.common.svc.cluster.local:9092 --alter --topic task-out --partitions 8 +``` + +查看消费组 + +```shell +./kafka-consumer-groups.sh --bootstrap-server kafka-0.kafka.common.svc.cluster.local:9092,kafka-1.kafka.common.svc.cluster.local:9092,kafka-2.kafka.common.svc.cluster.local:9092 --describe --group prd +``` + +根据分区offset查找消息 + +```shell +./kafka-console-consumer.sh --bootstrap-server kafka-0.kafka.common.svc.cluster.local:9092,kafka-1.kafka.common.svc.cluster.local:9092,kafka-2.kafka.common.svc.cluster.local:9092 --topic source-out --partition 2 --offset 2314 --max-messages 1 +``` + ## 重启程序 测试环境 @@ -43,4 +61,12 @@ kubectl rollout restart -n std-news-process-sit deployment std-news-process-sit ```shell kubectl rollout restart -n std-news-process-prd deployment std-news-process-prd-std-news-process-scheduler std-news-process-prd-std-news-process-sink std-news-process-prd-std-news-process-source std-news-process-prd-std-news-process-transformer +``` + +## 查看日志 + +查看所有pod的日志 + +```shell +kubectl logs -n std-news-process-prd -l line=std-news-process --max-log-requests 10 -f --prefix ``` \ No newline at end of file diff --git a/common/src/main/resources/markdown/readme.md b/common/src/main/resources/markdown/readme.md index c401c2a..4e16ee1 100644 --- a/common/src/main/resources/markdown/readme.md +++ b/common/src/main/resources/markdown/readme.md @@ -25,7 +25,3 @@ ### 异常处理 程序中发生的所有异常,都会通过钉钉通知 - -### 优化方案 - -1. 如果以后配置接入任务多,由于媒体接口响应速度慢,source组件会有消息堆积,需要考虑异步处理 \ No newline at end of file diff --git a/common/src/main/resources/markdown/sql.md b/common/src/main/resources/markdown/sql.md index 30b347b..e9ca7a5 100644 --- a/common/src/main/resources/markdown/sql.md +++ b/common/src/main/resources/markdown/sql.md @@ -1,6 +1,6 @@ # 常用SQL -## 查询当前任务状态 +## 查询已注册任务状态 ```sql select name, @@ -28,25 +28,43 @@ from task; ```sql select date(createDate) as 日期, spider, count(*) as count -from all_news -where spider in ( - "zgzqb_rss", - "zzkx_rss", - "zzzncx_rss", - "zzjnz_rss" - ) +from pyspider_resultdb.all_news +where JSON_EXTRACT(raw_data, '$.crawler') = 'snp' + and createDate > curdate() group by spider, 日期 -order by 日期 desc, count; +order by count desc; ``` -## 查开发环境入库数量 +## 查询图文源今日入库数据 ```sql -select date(createDate) as 日期, spider, count(*) as count -from std_news -where spider in (select distinct name - from task) -group by spider, 日期 -order by 日期 desc, count; +select title, + url, + content, + webSource, + spider, + postDate, + createDate, + update_time +from all_news +where JSON_EXTRACT(raw_data, '$.crawler') = 'snp' + and createDate > curdate() +order by createDate desc; ``` +## 查询某个视频源今日入库数据 + +```sql +select v.title, + t.`status`, + v.url, + v.`video_url`, + v.`created_time`, + t.`retry_count` +from `all_news_video_dycj` as v + left join `task_news` as t on v.`url` = t.`url` + left join `task_news_content` as c on t.`id` = c.id +where JSON_EXTRACT(v.raw_data, '$.crawler') = 'snp' +order by v.`created_time` desc +limit 50 +``` diff --git a/common/src/main/resources/markdown/task.md b/common/src/main/resources/markdown/task.md index d39dd85..bd49f03 100644 --- a/common/src/main/resources/markdown/task.md +++ b/common/src/main/resources/markdown/task.md @@ -34,3 +34,7 @@ 1. 和图文RSS的123情况一致 2. 存在直播链接无法正常处理的情况,已经配置域名黑名单对其进行忽略 +## 基金宝图文、视频RSS + +1. 对方给的接口数据格式比较乱,让对方改了多次才勉强能用 +2. 视频暂时只有一条数据,找媒体方沟通多次依旧只能提供一条 \ No newline at end of file diff --git a/pom.xml b/pom.xml index 4e595da..86ead46 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 @@ -55,7 +56,6 @@ import - diff --git a/sink/src/main/java/work/xuye/sink/handler/DisappearHandler.java b/sink/src/main/java/work/xuye/sink/handler/DisappearHandler.java index 724d47c..6628e1b 100644 --- a/sink/src/main/java/work/xuye/sink/handler/DisappearHandler.java +++ b/sink/src/main/java/work/xuye/sink/handler/DisappearHandler.java @@ -18,6 +18,7 @@ import work.xuye.common.enums.MessageType; import work.xuye.common.enums.Status; import work.xuye.common.properties.AlertProperties; import work.xuye.common.service.MessageService; +import work.xuye.common.utils.DebugUtil; import work.xuye.sink.service.JdbcService; import work.xuye.sink.service.SqlExecutor; @@ -48,6 +49,9 @@ public class DisappearHandler { Integer taskId = (Integer) message.getHeaders().get(MessageConstants.TASK_ID); TaskVO taskVO = taskManager.getTaskInfoByTaskId(taskId); + if (DebugUtil.isSkip(taskVO)) { + return; + } Task task = taskVO.getTask(); String taskName = task.getName(); SinkParams.CheckDelete delete = task.getSinkParams().getCheckDelete(); @@ -108,13 +112,13 @@ public class DisappearHandler { String url = alertProperties.getDomainName().getSink() + "/jdbc/execute/" + sqlEntity.getId() + "/" + sqlEntity.getCaptcha(); // 如果已经自动删除了,就发送自动删除结果 + subTitle = "待手动删除条数: " + removed.size(); if (autoDeleted) { subTitle = "已自动删除条数: " + removed.size(); fullMessage.append("> ✅[查看删除结果](").append(url).append(")"); } // 如果未自动删除,并且还配置开启了手动删除的入口,就拼接手动删除的入口 else if (disappearConfig.isSendExecuteSQLUrl()) { - subTitle = "待删除条数: " + removed.size(); fullMessage.append("> ⚠️[点击确认删除](").append(url).append(")"); } diff --git a/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java b/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java index eae2166..4b24748 100644 --- a/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java +++ b/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java @@ -15,13 +15,11 @@ import work.xuye.common.db.entity.vo.SinkParams; import work.xuye.common.db.entity.vo.TableTemplate; import work.xuye.common.db.service.TaskManager; import work.xuye.common.dto.TaskVO; -import work.xuye.common.enums.ProcessMode; -import work.xuye.common.enums.ProcessReason; -import work.xuye.common.enums.ResourceStatus; -import work.xuye.common.enums.Status; +import work.xuye.common.enums.*; import work.xuye.common.service.UrlMD5Service; import work.xuye.common.spel.CustomFunction; import work.xuye.common.spel.SpringExpressionLanguageEvaluator; +import work.xuye.common.utils.DebugUtil; import work.xuye.sink.service.JdbcService; import java.util.ArrayList; @@ -76,9 +74,13 @@ public class ItemHandler { return; } TaskVO taskVO = taskManager.getTaskInfoByTaskId((Integer) message.getHeaders().get(MessageConstants.TASK_ID)); + if (DebugUtil.isSkip(taskVO)) { + return; + } //相关字段放这里 String md5Digest = DigestUtils.md5DigestAsHex(message.getPayload().getBytes()); TableTemplate template = taskVO.getTemplate().getTableTemplate(); + SinkParams.CheckDelete checkDeleteConfig = taskVO.getTask().getSinkParams().getCheckDelete(); // 根据消息体,和映射关系,使用SpEL表达式,得到计算结果 HashMap expressionResultMap = evaluator.evaluate(taskVO.getMapping().getTableMapping(), message); // 如果不能通过校验,则抛出异常 @@ -105,18 +107,105 @@ public class ItemHandler { } //判断数据是否存在 boolean itemExist = this.itemExist(taskVO.getTask().getSinkParams(), template); - //如果已经存在了,就判断是否需要更新,如果需要更新就更新 + //如果已经存在了,就判断是否需要删除或更新 if (itemExist) { - this.tryUpdate(taskVO, expressionResultMap, md5Digest, template, reason); + // 先去尝试删除 + boolean deleted = this.tryDelete(taskVO, template, expressionResultMap, checkDeleteConfig, reason); + // 如果将已存在的数据删除了,就可以直接放入缓存然后结束了 + if (deleted) { + this.putToCache(template, md5Digest); + return; + } + Map dbItem = this.getExistedItem(taskVO, template); + this.tryUpdate(taskVO, dbItem, expressionResultMap, md5Digest, template, reason); //如果不存在,就插入 } else { - this.tryInsert(taskVO.getTask(), template, reason); + this.tryInsert(taskVO.getTask(), expressionResultMap, template, reason); + } + this.putToCache(template, md5Digest); + } + + /** + * 是否满足一组断言规则 + * + * @param task 任务 + * @param map map结构的数据 + * @param template 模板 + * @param predicates 断言规则 + * @return 是否满足 + */ + private boolean isSatisfied(Task task, Map map, TableTemplate template, List predicates, String description) { + if (ObjectUtils.isEmpty(predicates)) { + return false; + } + boolean isValid = true; + TableTemplate.Field uniqueField = template.getUniqueField(); + String taskName = task.getName(); + for (String expr : predicates) { + boolean valid = (Boolean) evaluator.evaluate(map, expr, Boolean.class); + if (!valid) { + log.info("[{}][ℹ️️ {}- 断言未通过 ]" + "[{}] {}: {}", + taskName, + description, + expr, + uniqueField.getFieldName(), + uniqueField.getValue()); + isValid = false; + break; + } } + return isValid; + } + + private void putToCache(TableTemplate template, String md5Digest) { // 必须在方法出栈前,将md5放入缓存,因为Spring Cloud Stream默认会重试三次,如果提前放入缓存了,会导致首次重试被判定为无需处理 urlMD5Service.put(template.getUniqueField().getValue().toString(), md5Digest); } + private boolean tryDelete(TaskVO taskVO, TableTemplate template, HashMap newItem, SinkParams.CheckDelete checkDeleteConfig, ProcessReason reason) { + // 如果不是 是固定值检测删除并且状态是开启,说明无需处理,直接返回 + if (!(checkDeleteConfig.getMode().equals(DeleteCheckMode.fixedValue) && checkDeleteConfig.getStatus().equals(Status.on))) { + return false; + } + SinkParams.FixedValueConfig fixedValueConfig = checkDeleteConfig.getFixedValueConfig(); + List predicates = fixedValueConfig.getPredicates(); + // 满足断言条件,才能继续处理 + boolean satisfied = this.isSatisfied(taskVO.getTask(), newItem, template, predicates, "删除"); + if (satisfied) { + return doDelete(taskVO, template.getUniqueField(), reason); + } + return false; + } + + private boolean doDelete(TaskVO taskVO, TableTemplate.Field uniqueField, ProcessReason reason) { + String tableName = taskVO.getTask().getSinkParams().getTableName(); + Object value = uniqueField.getValue(); + TableTemplate template = taskVO.getTemplate().getTableTemplate(); + TableTemplate.LogicDeleteField logicDeleteField = template.getLogicDeleteField(); + boolean isLogicDelete = !ObjectUtils.isEmpty(logicDeleteField); + String deleteSql; + if (!isLogicDelete) { + deleteSql = SqlGenerator.generateDeleteSql(tableName, uniqueField.getFieldName(), uniqueField.getValue()); + } else { + deleteSql = SqlGenerator.generateLogicDeleteUpdateSql(tableName, template); + } + int rows = jdbcService.executeSql(taskVO.getTask().getSinkParams().getDataSourceName(), deleteSql); + log.info("[{}][\uD83D\uDDD1️已{}删除{}条][{} {}] " + "{}: {}", + taskVO.getTask().getName(), + isLogicDelete ? "逻辑" : "", + rows, + reason.getEmoji(), + reason.name(), + uniqueField.getFieldName(), + value); + if (rows != 1) { + throw new RuntimeException("期望删除一条,实际删除" + rows + "条数据,请尽快处理"); + } + return true; + } + + private boolean isTemplateValid(String taskName, TableTemplate template) { boolean isValid = true; List allField = new ArrayList<>(); @@ -132,7 +221,7 @@ public class ItemHandler { for (String rule : rules) { boolean valid = (Boolean) evaluator.evaluate(field.getValue(), rule, Boolean.class); if (!valid) { - log.warn("[{}][❌️ 规则校验未通过]" + "[{}] {}: {}", + log.warn("[{}][❌️ 模板规则校验未通过]" + "[{}] {}: {}", taskName, rule, field.getFieldName(), @@ -201,23 +290,25 @@ public class ItemHandler { return jdbcService.isExist(sinkParams.getDataSourceName(), selectExist); } + private Map getExistedItem(TaskVO taskVO, TableTemplate template) { + String dataSourceName = taskVO.getTask().getSinkParams().getDataSourceName(); + return jdbcService.getOne(dataSourceName, SqlGenerator.generateFetchSql(template)); + } - private void tryUpdate(TaskVO taskVO, HashMap resultMap, String md5Digest, TableTemplate tableTemplate, ProcessReason reason) { + + private void tryUpdate(TaskVO taskVO, Map dbItem, HashMap resultMap, String md5Digest, TableTemplate tableTemplate, ProcessReason reason) { SinkParams sinkParams = taskVO.getTask().getSinkParams(); String taskName = taskVO.getTask().getName(); boolean updateConfigEnabled = sinkParams.getCheckUpdate().getStatus().equals(Status.on); if (updateConfigEnabled) { String dataSourceName = sinkParams.getDataSourceName(); Assert.notNull(dataSourceName, "dataSourceName is null"); - - Map item = jdbcService.getOne(dataSourceName, SqlGenerator.generateFetchSql(tableTemplate)); SinkParams.CheckUpdate checkUpdateConfig = sinkParams.getCheckUpdate(); String fieldName = checkUpdateConfig.getFieldName(); - Object dbValue = item.get(fieldName); + Object dbValue = dbItem.get(fieldName); if (ObjectUtils.isEmpty(dbValue)) { throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是数据库不存在该字段"); } - Object nowValue = resultMap.get(fieldName); if (!ObjectUtils.isEmpty(checkUpdateConfig.getJsonPath())) { dbValue = cf.jsonExtract(dbValue.toString(), checkUpdateConfig.getJsonPath()); @@ -245,7 +336,6 @@ public class ItemHandler { tableTemplate.getUniqueField().getValue()); } } else { - log.info("[{}][⏭️ 不检查更新][{} {}] " + "{}: {}", taskName, reason.getEmoji(), @@ -256,15 +346,16 @@ public class ItemHandler { } - private void tryInsert(Task task, TableTemplate template, ProcessReason reason) { - jdbcService.update(task.getSinkParams().getDataSourceName(), SqlGenerator.generateInsertSql(template)); - log.info("[{}][\uD83D\uDCBE 已保存][{} {}] " + "{}: {}", - task.getName(), - reason.getEmoji(), - reason.name(), - template.getUniqueField().getFieldName(), - template.getUniqueField().getValue()); + private void tryInsert(Task task, Map newItem, TableTemplate template, ProcessReason reason) { + if (this.isSatisfied(task, newItem, template, task.getSinkParams().getInsertConfig().getPredicates(), "保存")) { + jdbcService.update(task.getSinkParams().getDataSourceName(), SqlGenerator.generateInsertSql(template)); + log.info("[{}][\uD83D\uDCBE 已保存][{} {}] " + "{}: {}", + task.getName(), + reason.getEmoji(), + reason.name(), + template.getUniqueField().getFieldName(), + template.getUniqueField().getValue()); + } } - } \ No newline at end of file diff --git a/sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java b/sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java index bb58921..b801857 100644 --- a/sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java +++ b/sink/src/main/java/work/xuye/sink/handler/SqlGenerator.java @@ -51,7 +51,7 @@ public class SqlGenerator { sqlBuilder.append(", `").append(updateTimeField.getFieldName()).append("`=") .append(generateCurrentTimeFunctionString(updateTimeField.getFieldType())); } - + sqlBuilder.append(" WHERE ") .append('`').append(template.getUniqueField().getFieldName()).append('`') .append('=').append(formatFieldValue(template.getUniqueField())); @@ -195,14 +195,14 @@ public class SqlGenerator { return "SELECT `" + selectField + "` FROM `" + tableName + "` WHERE " + condition + '\'' + lastValue + '\''; } - + public static String generateGivenFieldSql(String tableName, String condition, String lastValue) { return "SELECT * FROM `" + tableName + "` WHERE " + condition + '\'' + lastValue + '\''; } - public static String generateDeleteSql(String tableName, String selectField, String value) { - return "DELETE FROM " + tableName + " WHERE " + selectField + "='" + value + "'" + " LIMIT 1;"; + public static String generateDeleteSql(String tableName, String field, Object value) { + return "DELETE FROM " + tableName + " WHERE " + field + "='" + value + "'" + " LIMIT 1;"; } public static List generateDeleteSqls(String tableName, String fieldName, List dbs) { @@ -212,4 +212,34 @@ public class SqlGenerator { } return result; } + + + public static String generateLogicDeleteUpdateSql(String tableName, TableTemplate template) { + + TableTemplate.Field updateTimeField = template.getUpdateTimeField(); + TableTemplate.LogicDeleteField logicDeleteField = template.getLogicDeleteField(); + String logicDeleteFieldName = logicDeleteField.getFieldName(); + String logicDeleteValue = logicDeleteField.getDeleteValue(); + TableTemplate.Field uniqueField = template.getUniqueField(); + String uniqueFieldFieldName = uniqueField.getFieldName(); + Object uniqueFieldValue = uniqueField.getValue(); + TableTemplate.Field updateField = template.getUpdateField(); + + StringBuilder sqlBuilder = new StringBuilder(); + sqlBuilder.append("UPDATE `").append(tableName).append("` SET "); + sqlBuilder.append('`').append(logicDeleteFieldName).append('`') + .append('=').append(logicDeleteValue); + sqlBuilder.append(", `").append(updateField.getFieldName()).append("`=") + .append(updateField.getValue()); + if (updateTimeField != null) { + sqlBuilder.append(", `").append(updateTimeField.getFieldName()).append("`=") + .append(generateCurrentTimeFunctionString(updateTimeField.getFieldType())); + } + + sqlBuilder.append(" WHERE ") + .append('`').append(uniqueFieldFieldName).append('`') + .append('=').append('\'').append(uniqueFieldValue.toString()) + .append("' LIMIT 1"); + return sqlBuilder.toString(); + } } diff --git a/sink/src/main/java/work/xuye/sink/service/JdbcService.java b/sink/src/main/java/work/xuye/sink/service/JdbcService.java index dd1ce42..80e8509 100644 --- a/sink/src/main/java/work/xuye/sink/service/JdbcService.java +++ b/sink/src/main/java/work/xuye/sink/service/JdbcService.java @@ -69,9 +69,24 @@ public class JdbcService { } + /** + * 查询一条数据,如果查询到了多条,抛出异常;如果没有查询到,返回null + * + * @param datasourceName 数据源名称 + * @param sql sql + * @return 查询结果,如果没有查询到,返回null + */ public Map getOne(String datasourceName, String sql) { JdbcTemplate jdbcTemplate = this.getJdbcTemplate(datasourceName); - return jdbcTemplate.queryForMap(sql); + List> maps = jdbcTemplate.queryForList(sql); + if (ObjectUtils.isEmpty(maps)) { + return null; + } + if (maps.size() > 1) { + log.error("期望查询出一条,实际查询出结果为{}条, sql:{}, result:{}", maps.size(), sql, maps); + throw new RuntimeException("期望查询出一条,实际查询出结果为" + maps.size() + "条"); + } + return maps.get(0); } @@ -106,7 +121,7 @@ public class JdbcService { public int executeSql(String datasource, String sql) { JdbcTemplate jdbcTemplate = this.getJdbcTemplate(datasource); int update = jdbcTemplate.update(sql); - log.info("execute sql, datasource:{}, sql:{}, result:{}", datasource, sql, update); + log.info("execute sql, datasource:[{}], sql:[{}], result:[{}]", datasource, sql, update); return update; } diff --git a/source/src/main/java/work/xuye/source/handler/SourceHandler.java b/source/src/main/java/work/xuye/source/handler/SourceHandler.java index a50e69a..4fbed81 100644 --- a/source/src/main/java/work/xuye/source/handler/SourceHandler.java +++ b/source/src/main/java/work/xuye/source/handler/SourceHandler.java @@ -20,6 +20,7 @@ import work.xuye.common.dto.TaskVO; import work.xuye.common.enums.ProcessMode; import work.xuye.common.enums.ResourceStatus; import work.xuye.common.service.UrlMD5Service; +import work.xuye.common.utils.DebugUtil; import work.xuye.common.utils.ID; import work.xuye.source.request.RequestClient; @@ -44,6 +45,9 @@ public class SourceHandler { public Message handle(Message message) { TaskVO taskVO = taskManager.getTaskInfoByTaskId((Integer) message.getHeaders().get(MessageConstants.TASK_ID)); + if (DebugUtil.isSkip(taskVO)) { + return null; + } HttpRes res = null; HttpRequestParams requestParams = message.getPayload(); StopWatch stopWatch = new StopWatch(); @@ -56,16 +60,20 @@ public class SourceHandler { urlMD5Service.put(requestParams.getUrl(), md5Digest); res.setResourceStatus(resourceStatus); } catch (Exception e) { - log.error("request error,request params:{}, error:{}", requestParams, e); + log.error("[{}] request error, request params:[{}], error message:[{}]", taskVO.getTask().getName(), requestParams, e.getMessage()); return null; } + if (!res.getStatus().is2xxSuccessful()) { + log.warn("[{}] response status code is not 2xx, request params: [{}], response: [{}]", taskVO.getTask().getName(), requestParams, res); + throw new RuntimeException("[" + taskVO.getTask().getName() + "]" + "unexpected status code: " + res.getStatus()); + } ProcessMode processMode = taskManager .getTaskInfoByTaskId( (Integer) message.getHeaders().get(MessageConstants.TASK_ID)) .getTask() .getProcessMode(); this.log(message, res, processMode, stopWatch.getLastTaskTimeMillis(), taskVO.getTask()); - if (res.getResourceStatus().equals(ResourceStatus.UNCHANGED) && ProcessMode.NORMAL.equals(processMode)) { + if (res.getResourceStatus().equals(ResourceStatus.UNCHANGED) && !ProcessMode.DEBUG.equals(processMode)) { return null; } return MessageBuilder @@ -81,8 +89,14 @@ public class SourceHandler { private void log(Message message, HttpRes res, ProcessMode processMode, long ms, Task task) { HttpRequestParams req = message.getPayload(); - - String emoji = processMode.equals(ProcessMode.NORMAL) ? "" : "🤖"; + String emoji = ""; + if (processMode.equals(ProcessMode.NORMAL)) { + emoji = ""; + } else if (processMode.equals(ProcessMode.DEBUG)) { + emoji = "🤖"; + } else if (processMode.equals(ProcessMode.WATCH)) { + emoji = "👀"; + } if (res.getStatus().is2xxSuccessful()) { log.info("[{}-{}] [{}{} {}] [{} {} {}ms] {} ", task.getId(), task.getName(), res.getResourceStatus().getEmoji(), emoji, diff --git a/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java b/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java index dcfaca9..4e87b7e 100644 --- a/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java +++ b/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java @@ -12,13 +12,16 @@ import org.springframework.util.ObjectUtils; import work.xuye.common.constant.BindingConstants; import work.xuye.common.constant.MessageConstants; import work.xuye.common.constant.StageConstants; +import work.xuye.common.db.entity.Task; import work.xuye.common.db.entity.vo.ParseParams; import work.xuye.common.db.entity.vo.SinkParams; +import work.xuye.common.db.entity.vo.TransformParams; import work.xuye.common.db.service.TaskManager; import work.xuye.common.dto.HttpRes; import work.xuye.common.dto.TaskVO; import work.xuye.common.enums.*; import work.xuye.common.service.UrlMD5Service; +import work.xuye.common.utils.DebugUtil; import work.xuye.common.utils.ID; import work.xuye.common.utils.JsonPathUtil; import work.xuye.transformer.transformer.MessageTransformer; @@ -45,7 +48,6 @@ public class TransformHandler { public List> handle(Message message) { - try { return this.doHandle(message); } catch (Exception e) { @@ -58,10 +60,12 @@ public class TransformHandler { public List> doHandle(Message message) { // 获取任务信息 TaskVO taskVO = taskManager.getTaskInfoByTaskId((Integer) message.getHeaders().get(MessageConstants.TASK_ID)); - + if (DebugUtil.isSkip(taskVO)) { + return null; + } ProcessReason processReason = processReason(message, taskVO); // 如果资源状态是未变化,直接结束执行 - if (ProcessReason.unchanged.equals(processReason)) { + if (ProcessReason.unchanged.equals(processReason) || ProcessReason.end.equals(processReason)) { // todo 考虑是否有必要加日志 return null; } @@ -79,6 +83,10 @@ public class TransformHandler { private ProcessReason processReason(Message message, TaskVO taskVO) { HttpRes httpRes = message.getPayload(); ProcessMode processMode = taskVO.getTask().getProcessMode(); + if (ProcessMode.WATCH.equals(processMode)) { + log.debug("@@ 此任务为监听模式,不会进行任何处理"); + return ProcessReason.end; + } // 理论上不会出现这种情况,因为符合这个条件的消息source是不会下发的,如果出现了,说明是消息被人为重发了 if (ResourceStatus.UNCHANGED.equals(httpRes.getResourceStatus()) && ProcessMode.NORMAL.equals(processMode)) { @@ -102,7 +110,16 @@ public class TransformHandler { private void transformResult(Message message, TaskVO taskVO) { String result = message.getPayload().getBody(); - List handlers = taskVO.getTask().getTransformParams().getHandlers(); + Task task = taskVO.getTask(); + TransformParams transformParams = task.getTransformParams(); + if (ObjectUtils.isEmpty(transformParams)) { + if (task.getProcessMode().getMode().equals(ProcessMode.WATCH.getMode())) { + return; + } else { + throw new RuntimeException(task.getName() + " transformParams is null"); + } + } + List handlers = transformParams.getHandlers(); Assert.notNull(handlers, "handlers is null"); String seedUrl = this.getSeedUrl(message); @@ -172,23 +189,10 @@ public class TransformHandler { results.add(itemMessage); }); if (results.size() > 0) { - log.info("@ [{}{}] 下发转换后的消息,数量:{}", processReason.getEmoji(), processReason, results.size()); + log.info("@ [{}][{}{}] 下发转换后的消息,数量:{}", taskVO.getTask().getName(), processReason.getEmoji(), processReason, results.size()); } - - // 如果没有items的jsonpath,说明只做监控,后续不做入库处理 } else { - Message itemMessage = MessageBuilder.withPayload(result) - .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.END) - .setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) - .setHeader(MessageConstants.SOURCE_TRACE_ID, message.getHeaders().get(MessageConstants.SOURCE_TRACE_ID)) - .setHeader(MessageConstants.TRANSFORMER_TRACE_ID, transformerTraceId) - .setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) - .setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME)) - .setHeader(MessageConstants.SEED_URL, message.getHeaders().get(MessageConstants.SEED_URL)) - .build(); - results.add(itemMessage); - log.info("@ 该消息未进行转换,直接下发,后续无需处理," + MessageConstants.TRANSFORMER_TRACE_ID + ": [{}]", - transformerTraceId); + log.warn("@@ 任务[{}]没有配置itemsPath", taskVO.getTask().getName()); } return results; } diff --git a/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java b/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java index 0fe7fba..70f34e8 100644 --- a/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java +++ b/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java @@ -14,14 +14,20 @@ import org.springframework.stereotype.Component; @Component("resData") @RequiredArgsConstructor public class ResDataTransformer implements MessageTransformer { - + + private int threshold = 300; @Override public String transform(String json, String seedUrl) { JsonObject res = JsonParser.parseString(json).getAsJsonObject(); boolean hasData = res.has("data"); - if (!hasData) { - throw new RuntimeException("res no data"); + if (hasData) { + threshold = 300; + } else { + threshold--; + } + if (threshold <= 0) { + throw new RuntimeException("res no data, seedUrl: " + seedUrl); } return res.get("data").getAsString(); }