diff --git a/common/src/main/java/work/xuye/common/cache/RedisProperties.java b/common/src/main/java/work/xuye/common/cache/RedisProperties.java index 949d855..767e637 100644 --- a/common/src/main/java/work/xuye/common/cache/RedisProperties.java +++ b/common/src/main/java/work/xuye/common/cache/RedisProperties.java @@ -19,9 +19,9 @@ public class RedisProperties { public static final String PROPERTIES_PREFIX = "redis"; - private Item url = new Item("url_md5_map", 60 * 60 * 24); + private Item url = new Item("url_md5_map", 60 * 60); - private Item message = new Item("message_md5_map", 60 * 60); + private Item message = new Item("message_md5_map", 60 * 30); private Item task = new Item(null, 40); diff --git a/common/src/main/java/work/xuye/common/config/ContainerCustomizer.java b/common/src/main/java/work/xuye/common/config/ContainerCustomizer.java new file mode 100644 index 0000000..ac5f57c --- /dev/null +++ b/common/src/main/java/work/xuye/common/config/ContainerCustomizer.java @@ -0,0 +1,20 @@ +package work.xuye.common.config; + +import org.springframework.cloud.stream.config.ListenerContainerCustomizer; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.listener.AbstractMessageListenerContainer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.util.backoff.FixedBackOff; + +/** + * @author xuye + * @since 2023/3/15 23:41 + **/ +@Configuration +public class ContainerCustomizer implements ListenerContainerCustomizer> { + @Override + public void configure(AbstractMessageListenerContainer container, String destinationName, String group) { + container.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0L, 0L))); + } + +} diff --git a/common/src/main/java/work/xuye/common/controller/ConfigController.java b/common/src/main/java/work/xuye/common/controller/ConfigController.java index faac252..a139e3a 100644 --- a/common/src/main/java/work/xuye/common/controller/ConfigController.java +++ b/common/src/main/java/work/xuye/common/controller/ConfigController.java @@ -7,9 +7,11 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import work.xuye.common.annotation.ResponseBodyEnhanceController; +import work.xuye.common.db.entity.Datasource; import work.xuye.common.db.entity.Mapping; import work.xuye.common.db.entity.Task; import work.xuye.common.db.entity.Template; +import work.xuye.common.db.service.DatasourceService; import work.xuye.common.db.service.MappingService; import work.xuye.common.db.service.TaskService; import work.xuye.common.db.service.TemplateService; @@ -31,6 +33,7 @@ public class ConfigController { private final TaskService taskService; private final TemplateService templateService; private final MappingService mappingService; + private final DatasourceService datasourceService; @GetMapping("/task") @@ -50,4 +53,10 @@ public class ConfigController { public List mappingList() { return mappingService.list(); } + + @GetMapping("/datasource") + @ApiOperation(value = "数据源列表") + public List datasourceList() { + return datasourceService.list(); + } } diff --git a/common/src/main/java/work/xuye/common/controller/RedisController.java b/common/src/main/java/work/xuye/common/controller/RedisController.java index b5336fa..360d1a2 100644 --- a/common/src/main/java/work/xuye/common/controller/RedisController.java +++ b/common/src/main/java/work/xuye/common/controller/RedisController.java @@ -57,5 +57,11 @@ public class RedisController { return redisService.clear(key); } + @DeleteMapping + @ApiOperation(value = "清除所有缓存") + public void clearAll() { + redisService.run(); + } + } diff --git a/common/src/main/java/work/xuye/common/db/entity/Datasource.java b/common/src/main/java/work/xuye/common/db/entity/Datasource.java index 37e88fa..95ba364 100644 --- a/common/src/main/java/work/xuye/common/db/entity/Datasource.java +++ b/common/src/main/java/work/xuye/common/db/entity/Datasource.java @@ -1,6 +1,7 @@ package work.xuye.common.db.entity; import com.baomidou.mybatisplus.annotation.*; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer; @@ -43,6 +44,7 @@ public class Datasource implements Serializable { @TableField("username") private String username; + @JsonIgnore @TableField("password") private String password; diff --git a/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java b/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java index 5de536c..2661b57 100644 --- a/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java +++ b/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java @@ -4,7 +4,6 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.http.HttpMethod; -import work.xuye.common.enums.CharsetType; import java.util.Map; @@ -23,7 +22,7 @@ public class HttpRequestParams { private Map body = Map.of(); - private CharsetType charset; + private String charset; private Map headers = Map.of(); diff --git a/common/src/main/java/work/xuye/common/enums/CharsetType.java b/common/src/main/java/work/xuye/common/enums/CharsetType.java deleted file mode 100644 index 61e7ee1..0000000 --- a/common/src/main/java/work/xuye/common/enums/CharsetType.java +++ /dev/null @@ -1,9 +0,0 @@ -package work.xuye.common.enums; - -/** - * @author xuye - * @since 2023/3/12 22:32 - **/ -public enum CharsetType { - defaultCharset, utf8, gb2312 -} diff --git a/common/src/main/java/work/xuye/common/service/MessageService.java b/common/src/main/java/work/xuye/common/service/MessageService.java index d7191be..1b18430 100644 --- a/common/src/main/java/work/xuye/common/service/MessageService.java +++ b/common/src/main/java/work/xuye/common/service/MessageService.java @@ -33,7 +33,8 @@ public class MessageService { .append("# ").append(title).append("\n"); if (!ObjectUtils.isEmpty(subTitle)) { if (subTitle.length() > 500) { - subTitle = subTitle.substring(0, 500) + "..."; + subTitle = subTitle.substring(0, 200) + " [.......] " + subTitle.substring(subTitle.length() - 200); + subTitle = subTitle.replaceAll("`", ""); } result.append("> ").append(subTitle).append("\n\r"); } diff --git a/common/src/main/java/work/xuye/common/spel/CustomFunction.java b/common/src/main/java/work/xuye/common/spel/CustomFunction.java index 3f78fb3..b9ad1ae 100644 --- a/common/src/main/java/work/xuye/common/spel/CustomFunction.java +++ b/common/src/main/java/work/xuye/common/spel/CustomFunction.java @@ -1,5 +1,8 @@ package work.xuye.common.spel; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonSyntaxException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.expression.Expression; @@ -126,4 +129,25 @@ public class CustomFunction { return (String) exp.getValue(); } + /** + * 用itemData包一层的原因:避免下游程序直接读到了有特殊意义的字段,比如type(财联社资讯类型 -1:快讯,0:要闻,1:非财联社资讯) + * ... + * + * @param json json字符串 + * @return 包装后的json + */ + public String wrap(String json, String key) { + JsonObject jsonObject = null; + try { + jsonObject = JsonParser.parseString(json).getAsJsonObject(); + } catch (JsonSyntaxException e) { + log.error("wrapJson handler error, json: [{}]", json); + throw e; + } + JsonObject result = new JsonObject(); + result.add(key, jsonObject); + return result.toString(); + } + + } diff --git a/common/src/main/java/work/xuye/common/utils/SpEL.java b/common/src/main/java/work/xuye/common/utils/SpEL.java index d5b32f5..8331bd1 100644 --- a/common/src/main/java/work/xuye/common/utils/SpEL.java +++ b/common/src/main/java/work/xuye/common/utils/SpEL.java @@ -11,7 +11,7 @@ import org.springframework.expression.spel.standard.SpelExpressionParser; public class SpEL { public static void main(String[] args) { ExpressionParser parser = new SpelExpressionParser(); - Expression exp = parser.parseExpression("T(java.time.LocalDateTime).now().format(T(java.time.format.DateTimeFormatter).ofPattern('yyyy-MM-dd HH:mm:ss'))"); + Expression exp = parser.parseExpression("T(java.time.LocalDateTime).now().minusHours(1).atZone(T(java.time.ZoneId).systemDefault()).toInstant().atOffset(T(java.time.ZoneOffset).UTC).toInstant().getEpochSecond()"); Object o = exp.getValue(); System.out.println(o); } diff --git a/common/src/main/resources/markdown/deploy.md b/common/src/main/resources/markdown/deploy.md new file mode 100644 index 0000000..2d2bb89 --- /dev/null +++ b/common/src/main/resources/markdown/deploy.md @@ -0,0 +1,8 @@ +# 部署信息 + +| 环境 | 数据库 | database | 部署信息 | +|--------|---------------------------|----------------------|-----------------| +| 开发环境 | mysql-sit.deepq.tech | std_news_process_dev | 无 | +| 测试环境 | mysql-sit.deepq.tech | std_news_process | kubernetes-sit | +| 生产环境 | mysql8-prd-rds.deepq.tech | std_news_process | kubernetes-prd | +| 临时生产环境 | 122.112.164.131 | pyspider_resultdb | 122.112.164.131 | \ No newline at end of file diff --git a/common/src/main/resources/markdown/ip.md b/common/src/main/resources/markdown/ip.md new file mode 100644 index 0000000..a9e64fd --- /dev/null +++ b/common/src/main/resources/markdown/ip.md @@ -0,0 +1,7 @@ +# 出口IP + +| IP | 环境 | +|----------------|----------------| +| 47.103.106.123 | 阿里云VPN | +| 47.103.90.3 | kubernetes-sit | +| 47.100.14.52 | kubernetes-prd | diff --git a/common/src/main/resources/markdown/readme.md b/common/src/main/resources/markdown/readme.md new file mode 100644 index 0000000..c401c2a --- /dev/null +++ b/common/src/main/resources/markdown/readme.md @@ -0,0 +1,31 @@ +# 程序介绍 + +### 基础功能 + +1. 媒体源监控:可对历史上已接入的、经常出现问题的媒体源进行爬取和记录,便于发现问题后方便追溯到媒体测的历史记录 +2. 数据接入:将媒体源的接口通过映射关系,保存到爬虫数据库中。使用数据接入功能,将自带媒体源监控监控功能。 + +### 适用场景 + +要满足下列全部条件 + +1. 结构化的资讯。 正例:XML JSON格式,反例:服务端渲染页面 +2. 需求简单。 正例:每经头豹、金吴咨询。反例:21世纪资讯 + +### 背景 + +数据接入历史背景是:pyspider框架开发、调试排错效率低下 + +数据接入的现状是:格式标准、流程几乎相似,因此可以开发一个基于配置的资讯接入处理程序 + +### 相关链接 + +代码仓库 https://git.deepq.tech/fhl/std-news-process + +### 异常处理 + +程序中发生的所有异常,都会通过钉钉通知 + +### 优化方案 + +1. 如果以后配置接入任务多,由于媒体接口响应速度慢,source组件会有消息堆积,需要考虑异步处理 \ No newline at end of file diff --git a/common/src/main/resources/markdown/sql.md b/common/src/main/resources/markdown/sql.md new file mode 100644 index 0000000..634985a --- /dev/null +++ b/common/src/main/resources/markdown/sql.md @@ -0,0 +1,26 @@ +# 常用SQL + +## 查询当前任务状态 + +```sql +select name, + description, + CASE + WHEN deleted = 0 THEN '🟢' + WHEN deleted = 1 THEN '🔴' + END AS '调度状态', + CASE + WHEN REPLACE(json_extract(sink_params, '$.checkUpdate.status'), '"', '') = 'on' THEN '🟢' + WHEN REPLACE(json_extract(sink_params, '$.checkUpdate.status'), '"', '') = 'off' THEN '🔴' + END as '是否检查更新', + CASE + WHEN REPLACE(json_extract(sink_params, '$.checkDelete.status'), '"', '') = 'on' THEN '🟢' + WHEN REPLACE(json_extract(sink_params, '$.checkDelete.status'), '"', '') = 'off' THEN '🔴' + END as '是否检查删除', + REPLACE(json_extract(request_params, '$.url'), '"', '') as 'URL', + + REPLACE(json_extract(sink_params, '$.dataSourceName'), '"', '') as '数据源名', + REPLACE(json_extract(sink_params, '$.tableName'), '"', '') as '表名' +from task; +``` + diff --git a/common/src/main/resources/markdown/task.md b/common/src/main/resources/markdown/task.md new file mode 100644 index 0000000..9232c6e --- /dev/null +++ b/common/src/main/resources/markdown/task.md @@ -0,0 +1,34 @@ +# 任务记录 + +## 格隆汇RSS + +1. 删除方式不是字段提供,而是程序遍历找到消息的记录自行处理 +2. 目前的处理方式是钉钉群通知,为了安全不自动执行delete语句 + +## 中国证券报图文RSS + +1. 非常标准 +2. 该源是纸质报纸的内容,发布时间缺失没有时分秒的精度,目前的做法是将程序发现记录的时间作为发布时间 +3. 由于情况2,该任务禁止使用更新功能,因为会导致发布时间被刷新 + +## 中证快讯图文RSS + +1. 对方响应数据小概率格式不正确会引发报错 +2. 有概率连续302 +3. 字符集声称是gb2312,但是实际上是gbk +4. 1和2的情况最近未出现 + +## 中证智能财讯图文RSS + +1. URL中需携带今日的日期 + +## 金牛座图文RSS + +1. JSON中套XML +2. 请求体中需要携带一个时间戳作为查询的起始日期时间 +3. 需要白名单内的IP才可以访问到,否则是空数据 +4. 存在URL为空的快讯,具体处理方式参考需求文档和代码 + +## 金牛座视频RSS + +1. 和图文RSS的123情况一致 \ No newline at end of file diff --git a/scheduler/src/main/java/work/xuye/scheduler/controller/ScheduleController.java b/scheduler/src/main/java/work/xuye/scheduler/controller/ScheduleController.java index 82ec0be..3ddb788 100644 --- a/scheduler/src/main/java/work/xuye/scheduler/controller/ScheduleController.java +++ b/scheduler/src/main/java/work/xuye/scheduler/controller/ScheduleController.java @@ -18,7 +18,7 @@ import java.util.ArrayList; * @since 2023/2/17 23:01 **/ @Slf4j -@Api(tags = "下发任务") +@Api(tags = "任务调度") @RequestMapping("/task") @RequiredArgsConstructor @ResponseBodyEnhanceController @@ -27,14 +27,14 @@ public class ScheduleController { private final IssueService issueService; @GetMapping - @ApiOperation(value = "下发全部任务") + @ApiOperation(value = "调度全部任务") public ArrayList issueTask() { return issueService.issueAllTask(); } @GetMapping("/id/{id}") - @ApiOperation(value = "根据任务ID下发") + @ApiOperation(value = "根据任务ID调度") public TaskVO issueTask(@PathVariable Integer id) { return issueService.issueTask(id); } diff --git a/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java b/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java index 1b4a17e..381afef 100644 --- a/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java +++ b/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java @@ -47,12 +47,10 @@ public class IssueService { streamBridge.send(BindingConstants.TASK_OUT, message); ProcessMode processMode = taskVO.getTask().getProcessMode(); String emoji = processMode.equals(ProcessMode.DEBUG) ? "\uD83E\uDD16" : ""; - log.info("\uD83D\uDCE4 {} [{}-{}], " + MessageConstants.TASK_TRACE_ID + ": [{}]" + ", processMode: [{}]", + log.info("\uD83D\uDCE4 {} {}-{}", emoji, taskVO.getTask().getId(), - taskVO.getTask().getName(), - message.getHeaders().get(MessageConstants.TASK_TRACE_ID), - processMode); + taskVO.getTask().getName()); return taskVO; } diff --git a/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java b/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java index c533fbe..3822daa 100644 --- a/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java +++ b/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java @@ -87,16 +87,17 @@ public class ItemHandler { if (reason == ProcessReason.unchanged) { return; } - this.log(message, template, reason); + + String dataSourceName = taskVO.getTask().getSinkParams().getDataSourceName(); //判断数据是否存在 boolean itemExist = this.itemExist(taskVO.getTask().getSinkParams(), template); //如果已经存在了,就判断是否需要更新,如果需要更新就更新 if (itemExist) { - this.tryUpdate(taskVO.getTask().getSinkParams(), expressionResultMap, md5Digest, template); + this.tryUpdate(taskVO, expressionResultMap, md5Digest, template, reason); //如果不存在,就插入 } else { - this.tryInsert(dataSourceName, template); + this.tryInsert(taskVO.getTask().getName(), dataSourceName, template, reason); } // 必须在方法出栈前,将md5放入缓存,因为Spring Cloud Stream默认会重试三次,如果提前放入缓存了,会导致首次重试被判定为无需处理 urlMD5Service.put(template.getUniqueField().getValue().toString(), md5Digest); @@ -129,19 +130,6 @@ public class ItemHandler { } } - private void log(Message message, TableTemplate tableTemplate, ProcessReason reason) { - log.info("@@ [{} {}] 开始处理: " + MessageConstants.TASK_TRACE_ID + ": [{}], " - + MessageConstants.SOURCE_TRACE_ID + ": [{}], " - + MessageConstants.TRANSFORMER_TRACE_ID + ": [{}], " - + "{}: [{}]", - reason.getEmoji(), - reason.name(), - tableTemplate.getUniqueField().getFieldName(), - tableTemplate.getUniqueField().getValue(), - message.getHeaders().get(MessageConstants.TASK_TRACE_ID), - message.getHeaders().get(MessageConstants.SOURCE_TRACE_ID), - message.getHeaders().get(MessageConstants.TRANSFORMER_TRACE_ID)); - } private ProcessReason getReason(TaskVO taskVO, TableTemplate template, String md5) { @@ -173,7 +161,9 @@ public class ItemHandler { } - private void tryUpdate(SinkParams sinkParams, HashMap resultMap, String md5Digest, TableTemplate tableTemplate) { + private void tryUpdate(TaskVO taskVO, HashMap resultMap, String md5Digest, TableTemplate tableTemplate, ProcessReason reason) { + SinkParams sinkParams = taskVO.getTask().getSinkParams(); + String taskName = taskVO.getTask().getName(); boolean updateConfigEnabled = sinkParams.getCheckUpdate().getStatus().equals(Status.on); if (updateConfigEnabled) { String dataSourceName = sinkParams.getDataSourceName(); @@ -195,23 +185,45 @@ public class ItemHandler { throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是没有配置jsonPath"); } if (dbValue.equals(nowValue)) { - log.info("@ ✅数据已存在于MySQL,且不需要更新"); + log.info("[{}][✅ 已是最新][{} {}] " + "{}: {}", + taskName, + reason.getEmoji(), + reason.name(), + tableTemplate.getUniqueField().getFieldName(), + tableTemplate.getUniqueField().getValue()); urlMD5Service.put(tableTemplate.getUniqueField().getValue().toString(), md5Digest); return; } else { String update = SqlGenerator.generateUpdateSql(tableTemplate); jdbcService.update(dataSourceName, update); - log.info("@ \uD83D\uDD04✅数据已存在于MySQL,已经更新数据"); + log.info("[{}][\uD83D\uDD04 已更新][{} {}] " + "{}: {}", + taskName, + reason.getEmoji(), + reason.name(), + tableTemplate.getUniqueField().getFieldName(), + tableTemplate.getUniqueField().getValue()); } } else { - log.info("@ ⏭️该任务未开启更新功能,不会进行更新处理"); + + log.info("[{}][⏭️ 不检查更新][{} {}] " + "{}: {}", + taskName, + reason.getEmoji(), + reason.name(), + tableTemplate.getUniqueField().getFieldName(), + tableTemplate.getUniqueField().getValue()); } } - private void tryInsert(String dataSourceName, TableTemplate template) { + private void tryInsert(String taskName, String dataSourceName, TableTemplate template, ProcessReason reason) { + jdbcService.update(dataSourceName, SqlGenerator.generateInsertSql(template)); - log.info("@ \uD83D\uDCBE 数据不存在于MySQL,已插入数据"); + log.info("[{}][\uD83D\uDCBE 已保存][{} {}] " + "{}: {}", + taskName, + reason.getEmoji(), + reason.name(), + template.getUniqueField().getFieldName(), + template.getUniqueField().getValue()); } diff --git a/source/src/main/java/work/xuye/source/handler/SourceHandler.java b/source/src/main/java/work/xuye/source/handler/SourceHandler.java index 7f4076a..882bffe 100644 --- a/source/src/main/java/work/xuye/source/handler/SourceHandler.java +++ b/source/src/main/java/work/xuye/source/handler/SourceHandler.java @@ -6,6 +6,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import org.springframework.util.DigestUtils; +import org.springframework.util.StopWatch; import work.xuye.common.constant.MessageConstants; import work.xuye.common.constant.StageConstants; import work.xuye.common.db.service.TaskManager; @@ -33,14 +34,23 @@ public class SourceHandler { public Message handle(Message message) { - // 先根据req拿到res,包含从缓存中检查该URL的状态 HttpRes res = null; + HttpRequestParams requestParams = message.getPayload(); + StopWatch stopWatch = new StopWatch(); try { - res = this.doHandle(message); + stopWatch.start(); + res = this.request(requestParams); + stopWatch.stop(); + + String md5Digest = DigestUtils.md5DigestAsHex(res.getBody().getBytes()); + ResourceStatus resourceStatus = urlMD5Service.getResourceStatus(requestParams.getUrl(), md5Digest); + urlMD5Service.put(requestParams.getUrl(), md5Digest); + res.setResourceStatus(resourceStatus); + } catch (Exception e) { log.error("@@ 发生异常: [{}], " + MessageConstants.TASK_TRACE_ID + ": [{}]", - message.getPayload().getUrl(), message.getHeaders().get(MessageConstants.TASK_TRACE_ID)); + requestParams.getUrl(), message.getHeaders().get(MessageConstants.TASK_TRACE_ID)); return null; } @@ -49,7 +59,7 @@ public class SourceHandler { (Integer) message.getHeaders().get(MessageConstants.TASK_ID)) .getTask() .getProcessMode(); - this.log(message, res, processMode); + this.log(message, res, processMode, stopWatch.getLastTaskTimeMillis()); if (res.getResourceStatus().equals(ResourceStatus.UNCHANGED) && ProcessMode.NORMAL.equals(processMode)) { @@ -64,40 +74,28 @@ public class SourceHandler { .setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate()) .setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) .setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME)) - .setHeader(MessageConstants.SEED_URL, message.getPayload().getUrl()) + .setHeader(MessageConstants.SEED_URL, requestParams.getUrl()) .build(); } - private void log(Message message, HttpRes res, ProcessMode processMode) { + private void log(Message message, HttpRes res, ProcessMode processMode, long ms) { HttpRequestParams req = message.getPayload(); String emoji = processMode.equals(ProcessMode.NORMAL) ? "" : "🤖"; if (res.getStatus().is2xxSuccessful()) { - log.info("@@ [{}{} {}] [{} {}] {} ", res.getResourceStatus().getEmoji(), + log.info("[{}{} {}] [{} {} {}s] {} ", res.getResourceStatus().getEmoji(), emoji, - res.getResourceStatus().getName(), req.getMethod(), res.getStatus(), req.getUrl()); + res.getResourceStatus().getName(), req.getMethod(), res.getStatus(), ms / 1000.0, req.getUrl()); } else { log.warn("@@ method: [{}], status: [{}], url: [{}], request: [{}], response: [{}]", req.getMethod(), res.getStatus(), req.getUrl(), req.getBody(), res.getBody()); } } - public HttpRes doHandle(Message message) { - - - HttpRequestParams requestParams = message.getPayload(); - - HttpRes httpRes = requestClient.execute(requestParams); - - String md5Digest = DigestUtils.md5DigestAsHex(httpRes.getBody().getBytes()); - - ResourceStatus resourceStatus = urlMD5Service.getResourceStatus(requestParams.getUrl(), md5Digest); - urlMD5Service.put(requestParams.getUrl(), md5Digest); - - httpRes.setResourceStatus(resourceStatus); - return httpRes; + public HttpRes request(HttpRequestParams req) { + return requestClient.execute(req); } diff --git a/source/src/main/java/work/xuye/source/request/JdkRequestClient.java b/source/src/main/java/work/xuye/source/request/JdkRequestClient.java index d6e66b6..209daea 100644 --- a/source/src/main/java/work/xuye/source/request/JdkRequestClient.java +++ b/source/src/main/java/work/xuye/source/request/JdkRequestClient.java @@ -7,9 +7,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import work.xuye.common.dto.HttpRequestParams; import work.xuye.common.dto.HttpRes; -import work.xuye.common.enums.CharsetType; -import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.io.IOException; import java.net.URI; @@ -17,7 +15,6 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.regex.Matcher; @@ -37,18 +34,18 @@ public class JdkRequestClient implements RequestClient { private SpelExpressionParser parser; @Resource private Gson gson; - private Map charsetMap; - @PostConstruct - private void initCharsetMap() { - if (charsetMap == null) { - charsetMap = new HashMap<>(); - } else { - charsetMap.clear(); + private Charset getCharsetByName(String charsetName) { + if (ObjectUtils.isEmpty(charsetName)) { + return Charset.defaultCharset(); + } + charsetName = charsetName.trim(); + try { + return Charset.forName(charsetName); + } catch (Exception e) { + log.warn("charsetName is {}, not found, use default charset", charsetName); + return Charset.defaultCharset(); } - charsetMap.put(CharsetType.defaultCharset, Charset.defaultCharset()); - charsetMap.put(CharsetType.utf8, StandardCharsets.UTF_8); - charsetMap.put(CharsetType.gb2312, Charset.forName("GB2312")); } @Override @@ -85,8 +82,7 @@ public class JdkRequestClient implements RequestClient { if (request.getCharset() == null) { stringBodyHandler = HttpResponse.BodyHandlers.ofString(); } else { - Charset charset = charsetMap.getOrDefault(request.getCharset(), - Charset.defaultCharset()); + Charset charset = this.getCharsetByName(request.getCharset()); stringBodyHandler = HttpResponse.BodyHandlers.ofString(charset); } diff --git a/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java b/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java index f938c63..3754548 100644 --- a/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java +++ b/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java @@ -18,6 +18,7 @@ import work.xuye.common.db.service.TaskManager; import work.xuye.common.dto.HttpRes; import work.xuye.common.dto.TaskVO; import work.xuye.common.enums.*; +import work.xuye.common.service.UrlMD5Service; import work.xuye.common.utils.ID; import work.xuye.common.utils.JsonPathUtil; import work.xuye.transformer.transformer.MessageTransformer; @@ -40,6 +41,7 @@ public class TransformHandler { private final Map transformMessageMap; private final TaskManager taskManager; + private final UrlMD5Service urlMD5Service; public List> handle(Message message) { @@ -47,9 +49,9 @@ public class TransformHandler { try { return this.doHandle(message); } catch (Exception e) { - this.logError(message); + urlMD5Service.removeUrlMD5(this.getSeedUrl(message)); + throw new RuntimeException(e); } - return null; } @@ -78,7 +80,6 @@ public class TransformHandler { HttpRes httpRes = message.getPayload(); ProcessMode processMode = taskVO.getTask().getProcessMode(); - // 理论上不会出现这种情况,因为符合这个条件的消息source是不会下发的,如果出现了,说明是消息被人为重发了 if (ResourceStatus.UNCHANGED.equals(httpRes.getResourceStatus()) && ProcessMode.NORMAL.equals(processMode)) { log.warn("@@ 经md5判断,此URL相对上次未更新,且为正常处理模式,无需做进一步处理"); @@ -199,12 +200,4 @@ public class TransformHandler { } - private void logError(Message message) { - log.error("@@@ 发生异常:[{}], " + MessageConstants.TASK_TRACE_ID + - ": [{}] " + MessageConstants.SOURCE_TRACE_ID + ": [{}]", - message.getPayload().getUrl(), - message.getHeaders().get(MessageConstants.TASK_TRACE_ID), - message.getHeaders().get(MessageConstants.SOURCE_TRACE_ID)); - } - } diff --git a/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java b/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java index a998d74..0fe7fba 100644 --- a/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java +++ b/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java @@ -2,6 +2,7 @@ package work.xuye.transformer.transformer; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @@ -11,15 +12,16 @@ import org.springframework.stereotype.Component; **/ @Slf4j @Component("resData") +@RequiredArgsConstructor public class ResDataTransformer implements MessageTransformer { + @Override public String transform(String json, String seedUrl) { JsonObject res = JsonParser.parseString(json).getAsJsonObject(); boolean hasData = res.has("data"); if (!hasData) { - log.info("resData transform error, json:{}, seedUrl:{}", json, seedUrl); - throw new RuntimeException("resData transform error, seedUrl:" + seedUrl); + throw new RuntimeException("res no data"); } return res.get("data").getAsString(); }