Przeglądaj źródła

生产环境更新准备

master
xuye 1 rok temu
rodzic
commit
0421c48977
22 zmienionych plików z 251 dodań i 94 usunięć
  1. +2
    -2
      common/src/main/java/work/xuye/common/cache/RedisProperties.java
  2. +20
    -0
      common/src/main/java/work/xuye/common/config/ContainerCustomizer.java
  3. +9
    -0
      common/src/main/java/work/xuye/common/controller/ConfigController.java
  4. +6
    -0
      common/src/main/java/work/xuye/common/controller/RedisController.java
  5. +2
    -0
      common/src/main/java/work/xuye/common/db/entity/Datasource.java
  6. +1
    -2
      common/src/main/java/work/xuye/common/dto/HttpRequestParams.java
  7. +0
    -9
      common/src/main/java/work/xuye/common/enums/CharsetType.java
  8. +2
    -1
      common/src/main/java/work/xuye/common/service/MessageService.java
  9. +24
    -0
      common/src/main/java/work/xuye/common/spel/CustomFunction.java
  10. +1
    -1
      common/src/main/java/work/xuye/common/utils/SpEL.java
  11. +8
    -0
      common/src/main/resources/markdown/deploy.md
  12. +7
    -0
      common/src/main/resources/markdown/ip.md
  13. +31
    -0
      common/src/main/resources/markdown/readme.md
  14. +26
    -0
      common/src/main/resources/markdown/sql.md
  15. +34
    -0
      common/src/main/resources/markdown/task.md
  16. +3
    -3
      scheduler/src/main/java/work/xuye/scheduler/controller/ScheduleController.java
  17. +2
    -4
      scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java
  18. +34
    -22
      sink/src/main/java/work/xuye/sink/handler/ItemHandler.java
  19. +20
    -22
      source/src/main/java/work/xuye/source/handler/SourceHandler.java
  20. +11
    -15
      source/src/main/java/work/xuye/source/request/JdkRequestClient.java
  21. +4
    -11
      transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java
  22. +4
    -2
      transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java

+ 2
- 2
common/src/main/java/work/xuye/common/cache/RedisProperties.java Wyświetl plik

@@ -19,9 +19,9 @@ public class RedisProperties {

public static final String PROPERTIES_PREFIX = "redis";

private Item url = new Item("url_md5_map", 60 * 60 * 24);
private Item url = new Item("url_md5_map", 60 * 60);

private Item message = new Item("message_md5_map", 60 * 60);
private Item message = new Item("message_md5_map", 60 * 30);

private Item task = new Item(null, 40);



+ 20
- 0
common/src/main/java/work/xuye/common/config/ContainerCustomizer.java Wyświetl plik

@@ -0,0 +1,20 @@
package work.xuye.common.config;

import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

/**
* @author xuye
* @since 2023/3/15 23:41
**/
@Configuration
public class ContainerCustomizer implements ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group) {
container.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0L, 0L)));
}

}

+ 9
- 0
common/src/main/java/work/xuye/common/controller/ConfigController.java Wyświetl plik

@@ -7,9 +7,11 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import work.xuye.common.annotation.ResponseBodyEnhanceController;
import work.xuye.common.db.entity.Datasource;
import work.xuye.common.db.entity.Mapping;
import work.xuye.common.db.entity.Task;
import work.xuye.common.db.entity.Template;
import work.xuye.common.db.service.DatasourceService;
import work.xuye.common.db.service.MappingService;
import work.xuye.common.db.service.TaskService;
import work.xuye.common.db.service.TemplateService;
@@ -31,6 +33,7 @@ public class ConfigController {
private final TaskService taskService;
private final TemplateService templateService;
private final MappingService mappingService;
private final DatasourceService datasourceService;


@GetMapping("/task")
@@ -50,4 +53,10 @@ public class ConfigController {
public List<Mapping> mappingList() {
return mappingService.list();
}

@GetMapping("/datasource")
@ApiOperation(value = "数据源列表")
public List<Datasource> datasourceList() {
return datasourceService.list();
}
}

+ 6
- 0
common/src/main/java/work/xuye/common/controller/RedisController.java Wyświetl plik

@@ -57,5 +57,11 @@ public class RedisController {
return redisService.clear(key);
}

@DeleteMapping
@ApiOperation(value = "清除所有缓存")
public void clearAll() {
redisService.run();
}


}

+ 2
- 0
common/src/main/java/work/xuye/common/db/entity/Datasource.java Wyświetl plik

@@ -1,6 +1,7 @@
package work.xuye.common.db.entity;

import com.baomidou.mybatisplus.annotation.*;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
@@ -43,6 +44,7 @@ public class Datasource implements Serializable {
@TableField("username")
private String username;

@JsonIgnore
@TableField("password")
private String password;



+ 1
- 2
common/src/main/java/work/xuye/common/dto/HttpRequestParams.java Wyświetl plik

@@ -4,7 +4,6 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.http.HttpMethod;
import work.xuye.common.enums.CharsetType;

import java.util.Map;

@@ -23,7 +22,7 @@ public class HttpRequestParams {

private Map<String, Object> body = Map.of();

private CharsetType charset;
private String charset;

private Map<String, String> headers = Map.of();



+ 0
- 9
common/src/main/java/work/xuye/common/enums/CharsetType.java Wyświetl plik

@@ -1,9 +0,0 @@
package work.xuye.common.enums;

/**
* @author xuye
* @since 2023/3/12 22:32
**/
public enum CharsetType {
defaultCharset, utf8, gb2312
}

+ 2
- 1
common/src/main/java/work/xuye/common/service/MessageService.java Wyświetl plik

@@ -33,7 +33,8 @@ public class MessageService {
.append("# ").append(title).append("\n");
if (!ObjectUtils.isEmpty(subTitle)) {
if (subTitle.length() > 500) {
subTitle = subTitle.substring(0, 500) + "...";
subTitle = subTitle.substring(0, 200) + " [.......] " + subTitle.substring(subTitle.length() - 200);
subTitle = subTitle.replaceAll("`", "");
}
result.append("> ").append(subTitle).append("\n\r");
}


+ 24
- 0
common/src/main/java/work/xuye/common/spel/CustomFunction.java Wyświetl plik

@@ -1,5 +1,8 @@
package work.xuye.common.spel;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.JsonSyntaxException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.expression.Expression;
@@ -126,4 +129,25 @@ public class CustomFunction {
return (String) exp.getValue();
}

/**
* 用itemData包一层的原因:避免下游程序直接读到了有特殊意义的字段,比如type(财联社资讯类型 -1:快讯,0:要闻,1:非财联社资讯)
* <a href="https://app.asana.com/0/1202103682792595/1204211768982825/f">...</a>
*
* @param json json字符串
* @return 包装后的json
*/
public String wrap(String json, String key) {
JsonObject jsonObject = null;
try {
jsonObject = JsonParser.parseString(json).getAsJsonObject();
} catch (JsonSyntaxException e) {
log.error("wrapJson handler error, json: [{}]", json);
throw e;
}
JsonObject result = new JsonObject();
result.add(key, jsonObject);
return result.toString();
}


}

+ 1
- 1
common/src/main/java/work/xuye/common/utils/SpEL.java Wyświetl plik

@@ -11,7 +11,7 @@ import org.springframework.expression.spel.standard.SpelExpressionParser;
public class SpEL {
public static void main(String[] args) {
ExpressionParser parser = new SpelExpressionParser();
Expression exp = parser.parseExpression("T(java.time.LocalDateTime).now().format(T(java.time.format.DateTimeFormatter).ofPattern('yyyy-MM-dd HH:mm:ss'))");
Expression exp = parser.parseExpression("T(java.time.LocalDateTime).now().minusHours(1).atZone(T(java.time.ZoneId).systemDefault()).toInstant().atOffset(T(java.time.ZoneOffset).UTC).toInstant().getEpochSecond()");
Object o = exp.getValue();
System.out.println(o);
}


+ 8
- 0
common/src/main/resources/markdown/deploy.md Wyświetl plik

@@ -0,0 +1,8 @@
# 部署信息

| 环境 | 数据库 | database | 部署信息 |
|--------|---------------------------|----------------------|-----------------|
| 开发环境 | mysql-sit.deepq.tech | std_news_process_dev | 无 |
| 测试环境 | mysql-sit.deepq.tech | std_news_process | kubernetes-sit |
| 生产环境 | mysql8-prd-rds.deepq.tech | std_news_process | kubernetes-prd |
| 临时生产环境 | 122.112.164.131 | pyspider_resultdb | 122.112.164.131 |

+ 7
- 0
common/src/main/resources/markdown/ip.md Wyświetl plik

@@ -0,0 +1,7 @@
# 出口IP

| IP | 环境 |
|----------------|----------------|
| 47.103.106.123 | 阿里云VPN |
| 47.103.90.3 | kubernetes-sit |
| 47.100.14.52 | kubernetes-prd |

+ 31
- 0
common/src/main/resources/markdown/readme.md Wyświetl plik

@@ -0,0 +1,31 @@
# 程序介绍

### 基础功能

1. 媒体源监控:可对历史上已接入的、经常出现问题的媒体源进行爬取和记录,便于发现问题后方便追溯到媒体测的历史记录
2. 数据接入:将媒体源的接口通过映射关系,保存到爬虫数据库中。使用数据接入功能,将自带媒体源监控监控功能。

### 适用场景

要满足下列全部条件

1. 结构化的资讯。 正例:XML JSON格式,反例:服务端渲染页面
2. 需求简单。 正例:每经头豹、金吴咨询。反例:21世纪资讯

### 背景

数据接入历史背景是:pyspider框架开发、调试排错效率低下

数据接入的现状是:格式标准、流程几乎相似,因此可以开发一个基于配置的资讯接入处理程序

### 相关链接

代码仓库 https://git.deepq.tech/fhl/std-news-process

### 异常处理

程序中发生的所有异常,都会通过钉钉通知

### 优化方案

1. 如果以后配置接入任务多,由于媒体接口响应速度慢,source组件会有消息堆积,需要考虑异步处理

+ 26
- 0
common/src/main/resources/markdown/sql.md Wyświetl plik

@@ -0,0 +1,26 @@
# 常用SQL

## 查询当前任务状态

```sql
select name,
description,
CASE
WHEN deleted = 0 THEN '🟢'
WHEN deleted = 1 THEN '🔴'
END AS '调度状态',
CASE
WHEN REPLACE(json_extract(sink_params, '$.checkUpdate.status'), '"', '') = 'on' THEN '🟢'
WHEN REPLACE(json_extract(sink_params, '$.checkUpdate.status'), '"', '') = 'off' THEN '🔴'
END as '是否检查更新',
CASE
WHEN REPLACE(json_extract(sink_params, '$.checkDelete.status'), '"', '') = 'on' THEN '🟢'
WHEN REPLACE(json_extract(sink_params, '$.checkDelete.status'), '"', '') = 'off' THEN '🔴'
END as '是否检查删除',
REPLACE(json_extract(request_params, '$.url'), '"', '') as 'URL',

REPLACE(json_extract(sink_params, '$.dataSourceName'), '"', '') as '数据源名',
REPLACE(json_extract(sink_params, '$.tableName'), '"', '') as '表名'
from task;
```


+ 34
- 0
common/src/main/resources/markdown/task.md Wyświetl plik

@@ -0,0 +1,34 @@
# 任务记录

## 格隆汇RSS

1. 删除方式不是字段提供,而是程序遍历找到消息的记录自行处理
2. 目前的处理方式是钉钉群通知,为了安全不自动执行delete语句

## 中国证券报图文RSS

1. 非常标准
2. 该源是纸质报纸的内容,发布时间缺失没有时分秒的精度,目前的做法是将程序发现记录的时间作为发布时间
3. 由于情况2,该任务禁止使用更新功能,因为会导致发布时间被刷新

## 中证快讯图文RSS

1. 对方响应数据小概率格式不正确会引发报错
2. 有概率连续302
3. 字符集声称是gb2312,但是实际上是gbk
4. 1和2的情况最近未出现

## 中证智能财讯图文RSS

1. URL中需携带今日的日期

## 金牛座图文RSS

1. JSON中套XML
2. 请求体中需要携带一个时间戳作为查询的起始日期时间
3. 需要白名单内的IP才可以访问到,否则是空数据
4. 存在URL为空的快讯,具体处理方式参考需求文档和代码

## 金牛座视频RSS

1. 和图文RSS的123情况一致

+ 3
- 3
scheduler/src/main/java/work/xuye/scheduler/controller/ScheduleController.java Wyświetl plik

@@ -18,7 +18,7 @@ import java.util.ArrayList;
* @since 2023/2/17 23:01
**/
@Slf4j
@Api(tags = "下发任务")
@Api(tags = "任务调度")
@RequestMapping("/task")
@RequiredArgsConstructor
@ResponseBodyEnhanceController
@@ -27,14 +27,14 @@ public class ScheduleController {
private final IssueService issueService;

@GetMapping
@ApiOperation(value = "下发全部任务")
@ApiOperation(value = "调度全部任务")
public ArrayList<TaskVO> issueTask() {
return issueService.issueAllTask();
}


@GetMapping("/id/{id}")
@ApiOperation(value = "根据任务ID下发")
@ApiOperation(value = "根据任务ID调度")
public TaskVO issueTask(@PathVariable Integer id) {
return issueService.issueTask(id);
}


+ 2
- 4
scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java Wyświetl plik

@@ -47,12 +47,10 @@ public class IssueService {
streamBridge.send(BindingConstants.TASK_OUT, message);
ProcessMode processMode = taskVO.getTask().getProcessMode();
String emoji = processMode.equals(ProcessMode.DEBUG) ? "\uD83E\uDD16" : "";
log.info("\uD83D\uDCE4 {} [{}-{}], " + MessageConstants.TASK_TRACE_ID + ": [{}]" + ", processMode: [{}]",
log.info("\uD83D\uDCE4 {} {}-{}",
emoji,
taskVO.getTask().getId(),
taskVO.getTask().getName(),
message.getHeaders().get(MessageConstants.TASK_TRACE_ID),
processMode);
taskVO.getTask().getName());
return taskVO;
}



+ 34
- 22
sink/src/main/java/work/xuye/sink/handler/ItemHandler.java Wyświetl plik

@@ -87,16 +87,17 @@ public class ItemHandler {
if (reason == ProcessReason.unchanged) {
return;
}
this.log(message, template, reason);


String dataSourceName = taskVO.getTask().getSinkParams().getDataSourceName();
//判断数据是否存在
boolean itemExist = this.itemExist(taskVO.getTask().getSinkParams(), template);
//如果已经存在了,就判断是否需要更新,如果需要更新就更新
if (itemExist) {
this.tryUpdate(taskVO.getTask().getSinkParams(), expressionResultMap, md5Digest, template);
this.tryUpdate(taskVO, expressionResultMap, md5Digest, template, reason);
//如果不存在,就插入
} else {
this.tryInsert(dataSourceName, template);
this.tryInsert(taskVO.getTask().getName(), dataSourceName, template, reason);
}
// 必须在方法出栈前,将md5放入缓存,因为Spring Cloud Stream默认会重试三次,如果提前放入缓存了,会导致首次重试被判定为无需处理
urlMD5Service.put(template.getUniqueField().getValue().toString(), md5Digest);
@@ -129,19 +130,6 @@ public class ItemHandler {
}
}

private void log(Message<String> message, TableTemplate tableTemplate, ProcessReason reason) {
log.info("@@ [{} {}] 开始处理: " + MessageConstants.TASK_TRACE_ID + ": [{}], "
+ MessageConstants.SOURCE_TRACE_ID + ": [{}], "
+ MessageConstants.TRANSFORMER_TRACE_ID + ": [{}], "
+ "{}: [{}]",
reason.getEmoji(),
reason.name(),
tableTemplate.getUniqueField().getFieldName(),
tableTemplate.getUniqueField().getValue(),
message.getHeaders().get(MessageConstants.TASK_TRACE_ID),
message.getHeaders().get(MessageConstants.SOURCE_TRACE_ID),
message.getHeaders().get(MessageConstants.TRANSFORMER_TRACE_ID));
}

private ProcessReason getReason(TaskVO taskVO, TableTemplate template, String md5) {

@@ -173,7 +161,9 @@ public class ItemHandler {
}


private void tryUpdate(SinkParams sinkParams, HashMap<String, Object> resultMap, String md5Digest, TableTemplate tableTemplate) {
private void tryUpdate(TaskVO taskVO, HashMap<String, Object> resultMap, String md5Digest, TableTemplate tableTemplate, ProcessReason reason) {
SinkParams sinkParams = taskVO.getTask().getSinkParams();
String taskName = taskVO.getTask().getName();
boolean updateConfigEnabled = sinkParams.getCheckUpdate().getStatus().equals(Status.on);
if (updateConfigEnabled) {
String dataSourceName = sinkParams.getDataSourceName();
@@ -195,23 +185,45 @@ public class ItemHandler {
throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是没有配置jsonPath");
}
if (dbValue.equals(nowValue)) {
log.info("@ ✅数据已存在于MySQL,且不需要更新");
log.info("[{}][✅ 已是最新][{} {}] " + "{}: {}",
taskName,
reason.getEmoji(),
reason.name(),
tableTemplate.getUniqueField().getFieldName(),
tableTemplate.getUniqueField().getValue());
urlMD5Service.put(tableTemplate.getUniqueField().getValue().toString(), md5Digest);
return;
} else {
String update = SqlGenerator.generateUpdateSql(tableTemplate);
jdbcService.update(dataSourceName, update);
log.info("@ \uD83D\uDD04✅数据已存在于MySQL,已经更新数据");
log.info("[{}][\uD83D\uDD04 已更新][{} {}] " + "{}: {}",
taskName,
reason.getEmoji(),
reason.name(),
tableTemplate.getUniqueField().getFieldName(),
tableTemplate.getUniqueField().getValue());
}
} else {
log.info("@ ⏭️该任务未开启更新功能,不会进行更新处理");

log.info("[{}][⏭️ 不检查更新][{} {}] " + "{}: {}",
taskName,
reason.getEmoji(),
reason.name(),
tableTemplate.getUniqueField().getFieldName(),
tableTemplate.getUniqueField().getValue());
}
}


private void tryInsert(String dataSourceName, TableTemplate template) {
private void tryInsert(String taskName, String dataSourceName, TableTemplate template, ProcessReason reason) {

jdbcService.update(dataSourceName, SqlGenerator.generateInsertSql(template));
log.info("@ \uD83D\uDCBE 数据不存在于MySQL,已插入数据");
log.info("[{}][\uD83D\uDCBE 已保存][{} {}] " + "{}: {}",
taskName,
reason.getEmoji(),
reason.name(),
template.getUniqueField().getFieldName(),
template.getUniqueField().getValue());
}



+ 20
- 22
source/src/main/java/work/xuye/source/handler/SourceHandler.java Wyświetl plik

@@ -6,6 +6,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.DigestUtils;
import org.springframework.util.StopWatch;
import work.xuye.common.constant.MessageConstants;
import work.xuye.common.constant.StageConstants;
import work.xuye.common.db.service.TaskManager;
@@ -33,14 +34,23 @@ public class SourceHandler {

public Message<HttpRes> handle(Message<HttpRequestParams> message) {


// 先根据req拿到res,包含从缓存中检查该URL的状态
HttpRes res = null;
HttpRequestParams requestParams = message.getPayload();
StopWatch stopWatch = new StopWatch();
try {
res = this.doHandle(message);
stopWatch.start();
res = this.request(requestParams);
stopWatch.stop();

String md5Digest = DigestUtils.md5DigestAsHex(res.getBody().getBytes());
ResourceStatus resourceStatus = urlMD5Service.getResourceStatus(requestParams.getUrl(), md5Digest);
urlMD5Service.put(requestParams.getUrl(), md5Digest);
res.setResourceStatus(resourceStatus);

} catch (Exception e) {
log.error("@@ 发生异常: [{}], " + MessageConstants.TASK_TRACE_ID + ": [{}]",
message.getPayload().getUrl(), message.getHeaders().get(MessageConstants.TASK_TRACE_ID));
requestParams.getUrl(), message.getHeaders().get(MessageConstants.TASK_TRACE_ID));
return null;
}

@@ -49,7 +59,7 @@ public class SourceHandler {
(Integer) message.getHeaders().get(MessageConstants.TASK_ID))
.getTask()
.getProcessMode();
this.log(message, res, processMode);
this.log(message, res, processMode, stopWatch.getLastTaskTimeMillis());


if (res.getResourceStatus().equals(ResourceStatus.UNCHANGED) && ProcessMode.NORMAL.equals(processMode)) {
@@ -64,40 +74,28 @@ public class SourceHandler {
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate())
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID))
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME))
.setHeader(MessageConstants.SEED_URL, message.getPayload().getUrl())
.setHeader(MessageConstants.SEED_URL, requestParams.getUrl())
.build();


}

private void log(Message<HttpRequestParams> message, HttpRes res, ProcessMode processMode) {
private void log(Message<HttpRequestParams> message, HttpRes res, ProcessMode processMode, long ms) {
HttpRequestParams req = message.getPayload();

String emoji = processMode.equals(ProcessMode.NORMAL) ? "" : "🤖";
if (res.getStatus().is2xxSuccessful()) {
log.info("@@ [{}{} {}] [{} {}] {} ", res.getResourceStatus().getEmoji(),
log.info("[{}{} {}] [{} {} {}s] {} ", res.getResourceStatus().getEmoji(),
emoji,
res.getResourceStatus().getName(), req.getMethod(), res.getStatus(), req.getUrl());
res.getResourceStatus().getName(), req.getMethod(), res.getStatus(), ms / 1000.0, req.getUrl());
} else {
log.warn("@@ method: [{}], status: [{}], url: [{}], request: [{}], response: [{}]",
req.getMethod(), res.getStatus(), req.getUrl(), req.getBody(), res.getBody());
}
}

public HttpRes doHandle(Message<HttpRequestParams> message) {


HttpRequestParams requestParams = message.getPayload();

HttpRes httpRes = requestClient.execute(requestParams);

String md5Digest = DigestUtils.md5DigestAsHex(httpRes.getBody().getBytes());

ResourceStatus resourceStatus = urlMD5Service.getResourceStatus(requestParams.getUrl(), md5Digest);
urlMD5Service.put(requestParams.getUrl(), md5Digest);

httpRes.setResourceStatus(resourceStatus);
return httpRes;
public HttpRes request(HttpRequestParams req) {
return requestClient.execute(req);
}




+ 11
- 15
source/src/main/java/work/xuye/source/request/JdkRequestClient.java Wyświetl plik

@@ -7,9 +7,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import work.xuye.common.dto.HttpRequestParams;
import work.xuye.common.dto.HttpRes;
import work.xuye.common.enums.CharsetType;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.IOException;
import java.net.URI;
@@ -17,7 +15,6 @@ import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
@@ -37,18 +34,18 @@ public class JdkRequestClient implements RequestClient {
private SpelExpressionParser parser;
@Resource
private Gson gson;
private Map<CharsetType, Charset> charsetMap;

@PostConstruct
private void initCharsetMap() {
if (charsetMap == null) {
charsetMap = new HashMap<>();
} else {
charsetMap.clear();
private Charset getCharsetByName(String charsetName) {
if (ObjectUtils.isEmpty(charsetName)) {
return Charset.defaultCharset();
}
charsetName = charsetName.trim();
try {
return Charset.forName(charsetName);
} catch (Exception e) {
log.warn("charsetName is {}, not found, use default charset", charsetName);
return Charset.defaultCharset();
}
charsetMap.put(CharsetType.defaultCharset, Charset.defaultCharset());
charsetMap.put(CharsetType.utf8, StandardCharsets.UTF_8);
charsetMap.put(CharsetType.gb2312, Charset.forName("GB2312"));
}

@Override
@@ -85,8 +82,7 @@ public class JdkRequestClient implements RequestClient {
if (request.getCharset() == null) {
stringBodyHandler = HttpResponse.BodyHandlers.ofString();
} else {
Charset charset = charsetMap.getOrDefault(request.getCharset(),
Charset.defaultCharset());
Charset charset = this.getCharsetByName(request.getCharset());
stringBodyHandler = HttpResponse.BodyHandlers.ofString(charset);
}



+ 4
- 11
transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java Wyświetl plik

@@ -18,6 +18,7 @@ import work.xuye.common.db.service.TaskManager;
import work.xuye.common.dto.HttpRes;
import work.xuye.common.dto.TaskVO;
import work.xuye.common.enums.*;
import work.xuye.common.service.UrlMD5Service;
import work.xuye.common.utils.ID;
import work.xuye.common.utils.JsonPathUtil;
import work.xuye.transformer.transformer.MessageTransformer;
@@ -40,6 +41,7 @@ public class TransformHandler {
private final Map<String, MessageTransformer> transformMessageMap;

private final TaskManager taskManager;
private final UrlMD5Service urlMD5Service;


public List<Message<String>> handle(Message<HttpRes> message) {
@@ -47,9 +49,9 @@ public class TransformHandler {
try {
return this.doHandle(message);
} catch (Exception e) {
this.logError(message);
urlMD5Service.removeUrlMD5(this.getSeedUrl(message));
throw new RuntimeException(e);
}
return null;
}


@@ -78,7 +80,6 @@ public class TransformHandler {
HttpRes httpRes = message.getPayload();
ProcessMode processMode = taskVO.getTask().getProcessMode();


// 理论上不会出现这种情况,因为符合这个条件的消息source是不会下发的,如果出现了,说明是消息被人为重发了
if (ResourceStatus.UNCHANGED.equals(httpRes.getResourceStatus()) && ProcessMode.NORMAL.equals(processMode)) {
log.warn("@@ 经md5判断,此URL相对上次未更新,且为正常处理模式,无需做进一步处理");
@@ -199,12 +200,4 @@ public class TransformHandler {
}


private void logError(Message<HttpRes> message) {
log.error("@@@ 发生异常:[{}], " + MessageConstants.TASK_TRACE_ID +
": [{}] " + MessageConstants.SOURCE_TRACE_ID + ": [{}]",
message.getPayload().getUrl(),
message.getHeaders().get(MessageConstants.TASK_TRACE_ID),
message.getHeaders().get(MessageConstants.SOURCE_TRACE_ID));
}

}

+ 4
- 2
transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java Wyświetl plik

@@ -2,6 +2,7 @@ package work.xuye.transformer.transformer;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@@ -11,15 +12,16 @@ import org.springframework.stereotype.Component;
**/
@Slf4j
@Component("resData")
@RequiredArgsConstructor
public class ResDataTransformer implements MessageTransformer {


@Override
public String transform(String json, String seedUrl) {
JsonObject res = JsonParser.parseString(json).getAsJsonObject();
boolean hasData = res.has("data");
if (!hasData) {
log.info("resData transform error, json:{}, seedUrl:{}", json, seedUrl);
throw new RuntimeException("resData transform error, seedUrl:" + seedUrl);
throw new RuntimeException("res no data");
}
return res.get("data").getAsString();
}


Ładowanie…
Anuluj
Zapisz