Browse Source

阅客资讯接入

master
xuye 1 year ago
parent
commit
060b85167b
54 changed files with 1400 additions and 268 deletions
  1. +4
    -0
      common/src/main/java/work/xuye/common/config/ContainerCustomizer.java
  2. +2
    -1
      common/src/main/java/work/xuye/common/constant/CacheConstants.java
  3. +11
    -0
      common/src/main/java/work/xuye/common/constant/CommonConstants.java
  4. +3
    -0
      common/src/main/java/work/xuye/common/db/entity/Mapping.java
  5. +10
    -11
      common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java
  6. +2
    -0
      common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java
  7. +21
    -0
      common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java
  8. +18
    -0
      common/src/main/java/work/xuye/common/db/service/StoreManager.java
  9. +23
    -14
      common/src/main/java/work/xuye/common/db/service/TaskManager.java
  10. +11
    -6
      common/src/main/java/work/xuye/common/dto/HttpRequestParams.java
  11. +8
    -4
      common/src/main/java/work/xuye/common/dto/HttpRes.java
  12. +3
    -4
      common/src/main/java/work/xuye/common/dto/TaskVO.java
  13. +5
    -0
      common/src/main/java/work/xuye/common/enums/ProcessReason.java
  14. +18
    -0
      common/src/main/java/work/xuye/common/enums/RequestMode.java
  15. +22
    -0
      common/src/main/java/work/xuye/common/properties/CommonProperties.java
  16. +38
    -0
      common/src/main/java/work/xuye/common/properties/YueKeProperties.java
  17. +68
    -12
      common/src/main/java/work/xuye/common/spel/CustomFunction.java
  18. +2
    -1
      common/src/main/java/work/xuye/common/spel/SpringExpressionLanguageEvaluator.java
  19. +0
    -74
      common/src/main/java/work/xuye/common/store/LocalMemoryNsKVMapStore.java
  20. +6
    -1
      common/src/main/java/work/xuye/common/store/MySQLCachedNsKVMapStore.java
  21. +4
    -0
      common/src/main/java/work/xuye/common/store/NsKVMapStore.java
  22. +39
    -5
      common/src/main/java/work/xuye/common/utils/DebugUtil.java
  23. +7
    -1
      common/src/main/java/work/xuye/common/utils/JsonPathUtil.java
  24. +33
    -0
      helper/.gitignore
  25. +50
    -0
      helper/pom.xml
  26. +16
    -0
      helper/src/main/java/work/xuye/helper/HelperApplication.java
  27. +28
    -0
      helper/src/main/java/work/xuye/helper/properties/HelperProperties.java
  28. +171
    -0
      helper/src/main/java/work/xuye/helper/service/UpdateService.java
  29. +15
    -0
      helper/src/main/resources/application.yml
  30. +21
    -0
      helper/src/main/resources/bootstrap.yml
  31. +3
    -9
      pom.xml
  32. +13
    -9
      scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java
  33. +2
    -2
      scheduler/src/main/resources/bootstrap.yml
  34. +34
    -7
      sink/src/main/java/work/xuye/sink/controller/OpenController.java
  35. +10
    -3
      sink/src/main/java/work/xuye/sink/handler/DisappearHandler.java
  36. +39
    -23
      sink/src/main/java/work/xuye/sink/handler/ItemHandler.java
  37. +14
    -0
      sink/src/main/java/work/xuye/sink/yueke/YueKeConstants.java
  38. +23
    -0
      sink/src/main/java/work/xuye/sink/yueke/YueKeException.java
  39. +20
    -0
      sink/src/main/java/work/xuye/sink/yueke/YueKeExceptionControllerAdvice.java
  40. +23
    -0
      sink/src/main/java/work/xuye/sink/yueke/YueKeRequestJsonConverter.java
  41. +18
    -0
      sink/src/main/java/work/xuye/sink/yueke/YueKeRequestModel.java
  42. +54
    -0
      sink/src/main/java/work/xuye/sink/yueke/YueKeResEnum.java
  43. +33
    -0
      sink/src/main/java/work/xuye/sink/yueke/YueKeResponse.java
  44. +25
    -0
      sink/src/main/java/work/xuye/sink/yueke/YueKeRevokeNewsIDsVO.java
  45. +147
    -0
      sink/src/main/java/work/xuye/sink/yueke/YueKeRevokeService.java
  46. +2
    -2
      sink/src/main/resources/bootstrap.yml
  47. +114
    -59
      source/src/main/java/work/xuye/source/handler/SourceHandler.java
  48. +3
    -2
      source/src/main/java/work/xuye/source/processor/MessageConsumer.java
  49. +23
    -11
      source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java
  50. +25
    -0
      source/src/main/java/work/xuye/source/spel/CustomRequest.java
  51. +111
    -0
      source/src/main/java/work/xuye/source/spel/YueKeRequest.java
  52. +2
    -2
      source/src/main/resources/bootstrap.yml
  53. +1
    -3
      transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java
  54. +2
    -2
      transformer/src/main/resources/bootstrap.yml

+ 4
- 0
common/src/main/java/work/xuye/common/config/ContainerCustomizer.java View File

@@ -1,5 +1,7 @@
package work.xuye.common.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.config.ListenerContainerCustomizer;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
@@ -10,7 +12,9 @@ import org.springframework.util.backoff.FixedBackOff;
* @author xuye
* @since 2023/3/15 23:41
**/
@Slf4j
@Configuration
@ConditionalOnClass(ListenerContainerCustomizer.class)
public class ContainerCustomizer implements ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group) {


+ 2
- 1
common/src/main/java/work/xuye/common/constant/CacheConstants.java View File

@@ -9,5 +9,6 @@ public class CacheConstants {
public static final String TASK = "snp_task";

public static final String STORE = "snp_store";

public static final String TEMPLATE = "snp_template";
}

+ 11
- 0
common/src/main/java/work/xuye/common/constant/CommonConstants.java View File

@@ -0,0 +1,11 @@
package work.xuye.common.constant;

/**
* @author xuye
* @since 2023/7/11 16:05
**/
public class CommonConstants {

public static final String SEPARATOR = ":";
}

+ 3
- 0
common/src/main/java/work/xuye/common/db/entity/Mapping.java View File

@@ -36,6 +36,9 @@ public class Mapping implements Serializable {
@TableField("name")
private String name;

@TableField("template")
private String template;

@TableField(value = "table_mapping", typeHandler = JacksonTypeHandler.class)
private MessageMapping tableMapping;



+ 10
- 11
common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java View File

@@ -18,9 +18,9 @@ import java.util.List;
public class SinkParams {

/**
* 模板名称
* 模板名称表达式
*/
private String templateName;
private String templateNameExpression;
/**
* 根据哪个字段来判断这条数据是否需要执行update
*/
@@ -29,21 +29,20 @@ public class SinkParams {
* 来判断这条数据是否需要执行delete
*/
private CheckDelete checkDelete;
/**
* 数据源名字
*/
private String dataSourceName;

/**
* 表名
*/
private String tableName;

private InsertConfig insertConfig;

@Data
public static class InsertConfig {
/**
* 前置检查
*/
private List<String> predicates;

/**
* 后置行为
*/
private List<String> postActions;
}

@Data


+ 2
- 0
common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java View File

@@ -20,6 +20,8 @@ import java.util.List;
public class TableTemplate {

private String tableName;
private String datasourceName;


/**
* 指的是数据库有默认值或者自增的字段,如果有,则在执行insert语句时,不需要填充该字段


+ 21
- 0
common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java View File

@@ -0,0 +1,21 @@
package work.xuye.common.db.entity.vo;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* @author xuye
* @since 2023/7/7 21:58
**/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TaskMiniVO {

private Integer id;

private String name;
}

+ 18
- 0
common/src/main/java/work/xuye/common/db/service/StoreManager.java View File

@@ -3,6 +3,7 @@ package work.xuye.common.db.service;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.annotation.CacheEvict;
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
import work.xuye.common.constant.CacheConstants;
@@ -28,4 +29,21 @@ public class StoreManager {
.one();
}


@CacheEvict(value = CacheConstants.STORE, key = "#namespace + ':' + #key")
public Store upsert(String namespace, String key, String value) {
Store store = this.getValueFromNsByKey(namespace, key);
if (store == null) {
store = new Store();
store.setNamespace(namespace);
store.setKey(key);
store.setValue(value);
storeService.save(store);
} else {
store.setValue(value);
storeService.updateById(store);
}
return store;
}

}

+ 23
- 14
common/src/main/java/work/xuye/common/db/service/TaskManager.java View File

@@ -1,5 +1,6 @@
package work.xuye.common.db.service;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cache.annotation.Cacheable;
@@ -11,6 +12,11 @@ import work.xuye.common.db.entity.Task;
import work.xuye.common.db.entity.Template;
import work.xuye.common.db.entity.vo.SinkParams;
import work.xuye.common.dto.TaskVO;
import work.xuye.common.spel.CustomFunction;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* @author xuye
@@ -21,11 +27,13 @@ import work.xuye.common.dto.TaskVO;
@RequiredArgsConstructor
public class TaskManager {

public final CustomFunction cf;

@Getter
private final TaskService taskService;
private final TemplateService templateService;
private final MappingService mappingService;


@Cacheable(value = CacheConstants.TASK, key = "#taskId", unless = "#result == null")
public TaskVO getTaskInfoByTaskId(Integer taskId) {
Task task = taskService.lambdaQuery()
@@ -35,23 +43,24 @@ public class TaskManager {

SinkParams sinkParams = task.getSinkParams();
if (sinkParams != null) {
String templateName = sinkParams.getTemplateName();

Template template = templateService.lambdaQuery()
.eq(Template::getName, templateName)
.one();
Assert.notNull(template, "模板" + templateName + "不存在");
Mapping mapping = mappingService.lambdaQuery()
List<Mapping> mappingList = mappingService.lambdaQuery()
.eq(Mapping::getName, task.getName())
.one();
Assert.notNull(mapping, "映射" + task.getName() + "不存在");
return new TaskVO(task, template, mapping);
.list();
Assert.notEmpty(mappingList, "mapping" + task.getName() + "不存在");
Map<String, Mapping> map = mappingList.stream().collect(Collectors.toMap(Mapping::getTemplate, mapping -> mapping));
return new TaskVO(task, map);
} else {
return new TaskVO(task, null, null);
return new TaskVO(task, null);
}

}

@Cacheable(value = CacheConstants.TEMPLATE, key = "#templateName", unless = "#result == null")
public Template getTemplateByName(String templateName) {
Template template = templateService.lambdaQuery()
.eq(Template::getName, templateName)
.one();
Assert.notNull(template, "模板" + templateName + "不存在");
return template;
}

}

+ 11
- 6
common/src/main/java/work/xuye/common/dto/HttpRequestParams.java View File

@@ -5,6 +5,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import work.xuye.common.enums.RequestMode;

import java.util.Map;

@@ -17,18 +18,22 @@ import java.util.Map;
@AllArgsConstructor
public class HttpRequestParams {

private RequestMode mode = RequestMode.normal;
private SpELConfig spELConfig;
private String method = HttpMethod.GET.name();

private String url;

private Map<String, Object> body = Map.of();

private String charset;

private Map<String, String> headers = Map.of();

private Map<String, String> placeholderExpressions = Map.of();
private String mediaType = MediaType.APPLICATION_JSON_VALUE;

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class SpELConfig {
private String expression;

}

}

+ 8
- 4
common/src/main/java/work/xuye/common/dto/HttpRes.java View File

@@ -18,12 +18,12 @@ import java.util.Map;
@AllArgsConstructor
public class HttpRes {

private String url;

private HttpRequestParams requestParams;
private ResourceStatus resourceStatus;
private HttpStatus status = HttpStatus.OK;
private String body;
private Map<String, List<String>> headers = Map.of();
private Long costTimeMillis;

public static HttpRes build() {
return new HttpRes();
@@ -54,10 +54,14 @@ public class HttpRes {
return this;
}

public HttpRes url(String url) {
this.url = url;
public HttpRes costTimeMillis(Long costTimeMillis) {
this.costTimeMillis = costTimeMillis;
return this;
}

public HttpRes requestParams(HttpRequestParams requestParams) {
this.requestParams = requestParams;
return this;
}

}

+ 3
- 4
common/src/main/java/work/xuye/common/dto/TaskVO.java View File

@@ -6,7 +6,8 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import work.xuye.common.db.entity.Mapping;
import work.xuye.common.db.entity.Task;
import work.xuye.common.db.entity.Template;

import java.util.Map;

/**
* @author xuye
@@ -20,8 +21,6 @@ public class TaskVO {

private Task task;

private Template template;

private Mapping mapping;
private Map<String, Mapping> templateMappingMap;

}

+ 5
- 0
common/src/main/java/work/xuye/common/enums/ProcessReason.java View File

@@ -26,6 +26,11 @@ public enum ProcessReason {
resource_changed("\uD83D\uDD04", "缓存中存在该记录,但与最新状态不一致"),

/**
* 外部条件满足
*/
external_condition("✅⚠️", "外部条件引发处理"),

/**
* 结束执行
*/
end("\uD83D\uDD1A", "结束执行"),


+ 18
- 0
common/src/main/java/work/xuye/common/enums/RequestMode.java View File

@@ -0,0 +1,18 @@
package work.xuye.common.enums;

/**
* @author xuye
* @since 2023/3/9 16:24
**/
public enum RequestMode {

/**
* 常规的根据参数发起http请求
*/
normal,

/**
* 执行表达式
*/
SpEL
}

+ 22
- 0
common/src/main/java/work/xuye/common/properties/CommonProperties.java View File

@@ -0,0 +1,22 @@
package work.xuye.common.properties;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;

/**
* @author xuye
* @since 2023/3/6 17:35
**/
@Data
@Component
@RefreshScope
@ConfigurationProperties(prefix = CommonProperties.PROPERTIES_PREFIX)
public class CommonProperties {

public static final String PROPERTIES_PREFIX = "common";

private boolean production = true;

}

+ 38
- 0
common/src/main/java/work/xuye/common/properties/YueKeProperties.java View File

@@ -0,0 +1,38 @@
package work.xuye.common.properties;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
import work.xuye.common.constant.CommonConstants;

/**
* @author xuye
* @since 2023/3/6 17:35
**/
@Data
@Component
@RefreshScope
@ConfigurationProperties(prefix = YueKeProperties.PROPERTIES_PREFIX)
public class YueKeProperties {

public static final String PROPERTIES_PREFIX = "yueke";


private String taskName = "yueke_json";
private String idUrlMapName = "id_url_map";
private String channelIdNameMap = "channel_id_name_map";

private String secret = "Tk1xa4xNDPj8atGD5eGeXad91xQhKY";


public String getNamespaceOfIdUrlMap() {
return this.taskName + CommonConstants.SEPARATOR + this.idUrlMapName;
}

public String getNamespaceOfChannelIdNameMap() {
return this.taskName + CommonConstants.SEPARATOR + this.channelIdNameMap;
}


}

+ 68
- 12
common/src/main/java/work/xuye/common/spel/CustomFunction.java View File

@@ -10,11 +10,14 @@ import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.stereotype.Component;
import org.springframework.util.DigestUtils;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils;
import org.springframework.web.util.UriComponentsBuilder;
import work.xuye.common.constant.CustomerDateTimeFormatter;
import work.xuye.common.constant.RawDataFiledKey;
import work.xuye.common.store.NsKVMapStore;
import work.xuye.common.utils.HttpUtil;
import work.xuye.common.utils.JsonPathUtil;

import java.time.Instant;
import java.time.LocalDateTime;
@@ -36,7 +39,7 @@ public class CustomFunction {

private final SpelExpressionParser parser;

private final NsKVMapStore nsKvMapStore;
private final NsKVMapStore mapStore;

public String jsonExtract(String json, String path) {
String result = null;
@@ -140,7 +143,6 @@ public class CustomFunction {
return (String) exp.getValue();
}


public String warpExclude(String json, String excludeFiled, String... fields) {
JsonObject obj = JsonParser.parseString(json).getAsJsonObject();
if (obj.has(excludeFiled)) {
@@ -174,7 +176,12 @@ public class CustomFunction {
for (int i = 0; i < fields.length; i += 2) {
String key = fields[i];
String path = fields[i + 1];
String value = this.jsonExtract(json, path);
String value;
if (JsonPathUtil.isJsonPath(path)) {
value = this.jsonExtract(json, path);
} else {
value = path;
}
result.add(key, new JsonPrimitive(value));
log.debug("wrap handler, json: [{}], key: [{}], path: [{}], value: [{}]", json, key, path, value);
}
@@ -190,7 +197,6 @@ public class CustomFunction {
return HttpUtil.isHttpRes2XX(url);
}


public boolean urlNotContains(String url, String contains) {
return !url.contains(contains);
}
@@ -199,17 +205,44 @@ public class CustomFunction {
* 根据key从命名空间中获取value
*
* @param namespace 任务名
* @param json item的json字符串
* @param path key的jsonpath
* @param key key
* @return value
*/
public String getValueByKeyFromNs(String namespace, String json, String path) {
String key = this.jsonExtract(json, path);
String value = nsKvMapStore.getValue(namespace, key);
public String getValueByKeyFromNs(String namespace, String key) {
String value = mapStore.getValue(namespace, key);
log.debug("getValueByKeyFromNs handler, namespace: [{}], key-in: [{}], value-out: [{}]", namespace, key, value);
return value;
}

public String getValueByKeyFromNs(String namespace, String json, String jsonPath) {
String key = this.jsonExtract(json, jsonPath);
return getValueByKeyFromNs(namespace, key);
}


public void putValueToNs(String namespace, String key, String value) {
mapStore.upsert(namespace, key, value);
log.debug("putValueToNs handler, namespace: [{}], key: [{}], value: [{}]", namespace, key, value);
}

public void putValueToNs(String namespace, String json, String jsonPath, String value) {
String key = this.jsonExtract(json, jsonPath);
putValueToNs(namespace, key, value);
}

public String getParamsFromUrl(String json, String jsonPath, String key) {
String url = this.jsonExtract(json, jsonPath);
return getParamsFromUrl(url, key);
}

public String getParamsFromUrl(String url, String key) {
UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(url);
MultiValueMap<String, String> queryParams = builder.build().getQueryParams();
String value = queryParams.getFirst(key);
log.debug("getParamsFromUrl handler, url: [{}], key-in: [{}], value-out: [{}]", url, key, value);
return value;
}


/**
* 先从map中获取指定的key,然后使用jsonpath提取json中的数据,判断是否等于期望值
@@ -222,13 +255,27 @@ public class CustomFunction {
*/
public boolean equals(Map<String, Object> map, String mapFieldKey, String jsonPath, Object value) {
Object fieldValue = map.get(mapFieldKey);
log.debug("equals handler2, map: [{}], mapFieldKey: [{}], mapValue: [{}]", map, mapFieldKey, value);
log.debug("equals handler, map: [{}], mapFieldKey: [{}], mapValue: [{}]", map, mapFieldKey, value);
String result = this.jsonExtract(fieldValue.toString(), jsonPath);
boolean equals = result.equals(value);
log.debug("equals handler1, json: [{}], jsonPath: [{}], value: [{}], result: [{}]", fieldValue, jsonPath, value, equals);
log.debug("equals handler, json: [{}], jsonPath: [{}], value: [{}], result: [{}]", fieldValue, jsonPath, value, equals);
return equals;
}

public boolean keyNotExistInNs(Map<String, Object> map, String field, String jsonPath, String namespace) {
return !keyExistInNs(map, field, jsonPath, namespace);
}

public boolean keyExistInNs(Map<String, Object> map, String field, String jsonPath, String namespace) {
Object o = map.get(field);
String v = this.jsonExtract(o.toString(), jsonPath);
String queryResult = mapStore.getValue(namespace, v);
boolean has = !ObjectUtils.isEmpty(queryResult);
log.info("keyExistInNs handler, value: [{}], namespace: [{}], result: [{}]", v, namespace, has);
return has;
}


/**
* 固定返回 true,用于希望断言通过时使用
*
@@ -270,7 +317,6 @@ public class CustomFunction {
return array.toString();
}


public String epochMilliToLocalDatetime(String json, String path) {
String epochMilli = this.jsonExtract(json, path);
String localDatetime = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(epochMilli)), ZoneId.of("Asia/Shanghai")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
@@ -279,4 +325,14 @@ public class CustomFunction {
}


public String ifEqualsIgnoreCase(String json, String jsonpath, String dest, String ifValue, String elseValue) {
String value = this.jsonExtract(json, jsonpath);
if (value.equalsIgnoreCase(dest)) {
return ifValue;
} else {
return elseValue;
}
}


}

+ 2
- 1
common/src/main/java/work/xuye/common/spel/SpringExpressionLanguageEvaluator.java View File

@@ -22,11 +22,12 @@ import java.util.List;
@Service
@RequiredArgsConstructor
public class SpringExpressionLanguageEvaluator {
public final CustomFunction cf;

private final SpelExpressionParser parser;


public HashMap<String, Object> evaluate(MessageMapping messageMapping, Message<String> message) {
StandardEvaluationContext context = new StandardEvaluationContext();
String seedUrl = (String) message.getHeaders().get(MessageConstants.SEED_URL);


+ 0
- 74
common/src/main/java/work/xuye/common/store/LocalMemoryNsKVMapStore.java View File

@@ -1,74 +0,0 @@
package work.xuye.common.store;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author xuye
* @since 2023/3/27 11:02
**/
@Component
public class LocalMemoryNsKVMapStore implements NsKVMapStore {

private final static String GLH_V2 = "glh_v2_rss";

/**
* key: 命名空间,默认以任务名命名
* value: key:key
* value:value:value
*/
private Map<String, Map<String, Object>> store = null;


@PostConstruct
public void init() {
if (store != null) {
store.clear();
} else {
store = new ConcurrentHashMap<>();
}
store.put(GLH_V2, initGLH());
}

public Map<String, Object> initGLH() {
HashMap<String, Object> map = new HashMap<>();

map.put("港股", "格隆汇-快讯");
map.put("A股", "格隆汇-快讯");
map.put("美股", "格隆汇-快讯");
map.put("商品外汇", "格隆汇-快讯");
map.put("楼市", "格隆汇-快讯");
map.put("基⾦", "格隆汇-快讯");
map.put("债券", "格隆汇-快讯");

map.put("港股公告摘要", "格隆汇-要闻");
map.put("⼤⾏评级", "格隆汇-要闻");
map.put("业绩直击", "格隆汇-要闻");
map.put("港股异动", "格隆汇-要闻");
map.put("公司信息", "格隆汇-要闻");
map.put("市场综述", "格隆汇-要闻");
map.put("新股速递", "格隆汇-要闻");
map.put("A股公告摘要", "格隆汇-要闻");
map.put("美股异动", "格隆汇-要闻");

return map;
}


@Override
public String getValue(String namespace, String key) {
Map<String, Object> map = store.get(namespace);
if (map == null) {
throw new RuntimeException("namespace [" + namespace + "] not found in local store");
}
Object value = map.getOrDefault(key, null);
if (value == null) {
return null;
}
return value.toString();
}
}

+ 6
- 1
common/src/main/java/work/xuye/common/store/MySQLCachedNsKVMapStore.java View File

@@ -22,9 +22,14 @@ public class MySQLCachedNsKVMapStore implements NsKVMapStore {
public String getValue(String namespace, String key) {
Store store = storeManager.getValueFromNsByKey(namespace, key);
if (ObjectUtils.isEmpty(store)) {
throw new RuntimeException("namespace [" + namespace + "] not found in db store");
return null;
}
return store.getValue();
}

@Override
public void upsert(String namespace, String key, String value) {
storeManager.upsert(namespace, key, value);
}

}

+ 4
- 0
common/src/main/java/work/xuye/common/store/NsKVMapStore.java View File

@@ -6,5 +6,9 @@ package work.xuye.common.store;
**/

public interface NsKVMapStore {
String getValue(String namespace, String key);

void upsert(String namespace, String key, String value);

}

+ 39
- 5
common/src/main/java/work/xuye/common/utils/DebugUtil.java View File

@@ -1,7 +1,12 @@
package work.xuye.common.utils;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import work.xuye.common.db.entity.Task;
import work.xuye.common.db.entity.vo.TaskMiniVO;
import work.xuye.common.dto.TaskVO;
import work.xuye.common.properties.CommonProperties;

import java.util.Set;

@@ -10,22 +15,51 @@ import java.util.Set;
* @since 2023/5/6 10:31
**/
@Slf4j
@Component
public class DebugUtil {

private static final boolean DEBUG = false;
private static final Set<Integer> taskIds = Set.of(17);
private static boolean DEBUG = false;

private static final Set<Integer> taskIds = Set.of(5);

public static boolean isSkip(TaskVO task) {
@Autowired
public DebugUtil(CommonProperties commonProperties) {
if (commonProperties.isProduction()) {
DEBUG = false;
}
}

public static boolean isSkip(TaskMiniVO taskMiniVO) {
return isSkip(taskMiniVO.getId(), taskMiniVO.getName());
}

public static boolean isSkip(Task task) {
return isSkip(task.getId(), task.getName());
}

public static boolean isSkip(TaskVO taskVO) {
return isSkip(taskVO.getTask().getId(), taskVO.getTask().getName());
}

public static boolean isSkip(Integer taskId) {
if (!DEBUG) {
return false;
}
Integer taskId = task.getTask().getId();
if (!taskIds.contains(taskId)) {
log.warn("skip task, [{}-{}]", taskId, task.getTask().getName());
log.warn("skip task, [{}]", taskId);
return true;
}
return false;
}

public static boolean isSkip(Integer taskId, String taskName) {
if (!DEBUG) {
return false;
}
if (!taskIds.contains(taskId)) {
log.warn("skip task, [{}-{}]", taskId, taskName);
return true;
}
return false;
}
}

+ 7
- 1
common/src/main/java/work/xuye/common/utils/JsonPathUtil.java View File

@@ -32,7 +32,7 @@ public class JsonPathUtil {
public static JsonArray extractAsJsonArray(String json, String path, String seedUrl) {
boolean exist = JsonPathUtil.exist(json, path);
if (!exist) {
log.warn("jsonPath: [{}] from [{}] not exist in json:\n{}", path, seedUrl, json);
log.debug("jsonPath: [{}] from [{}] not exist in json:\n{}", path, seedUrl, json);
return new JsonArray();
}
Object read = JsonPathUtil.read(json, path);
@@ -57,6 +57,12 @@ public class JsonPathUtil {
return read;
}

public static boolean isJsonPath(String expression) {
// todo 后续可优化成正则判断
return expression.startsWith("$");
}


@Autowired
public void setGson(Gson gson) {
JsonPathUtil.gson = gson;


+ 33
- 0
helper/.gitignore View File

@@ -0,0 +1,33 @@
HELP.md
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/

### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache

### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/

### VS Code ###
.vscode/

+ 50
- 0
helper/pom.xml View File

@@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>work.xuye</groupId>
<artifactId>std-news-process</artifactId>
<version>0.0.13-SNAPSHOT</version>
</parent>
<groupId>work.xuye</groupId>
<artifactId>helper</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>helper</name>
<description>helper</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>work.xuye</groupId>
<artifactId>common</artifactId>
<version>0.0.13-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.11</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

+ 16
- 0
helper/src/main/java/work/xuye/helper/HelperApplication.java View File

@@ -0,0 +1,16 @@
package work.xuye.helper;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@MapperScan(basePackages = "work.xuye.common.db.mapper")

@SpringBootApplication(scanBasePackages = {"work.xuye.*"})
public class HelperApplication {

public static void main(String[] args) {
SpringApplication.run(HelperApplication.class, args);
}

}

+ 28
- 0
helper/src/main/java/work/xuye/helper/properties/HelperProperties.java View File

@@ -0,0 +1,28 @@
package work.xuye.helper.properties;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
* @author xuye
* @since 2023/1/14 21:50
**/
@Data
@Component
@ConfigurationProperties(prefix = HelperProperties.PROPERTIES_PREFIX)
public class HelperProperties {

public static final String PROPERTIES_PREFIX = "helper";

private Map<String, DataSource> datasourceMap;

@Data
public static class DataSource {
private String url;
private String username;
private String password;
}
}

+ 171
- 0
helper/src/main/java/work/xuye/helper/service/UpdateService.java View File

@@ -0,0 +1,171 @@
package work.xuye.helper.service;

import cn.hutool.db.Db;
import cn.hutool.db.Entity;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import work.xuye.helper.properties.HelperProperties;

import javax.annotation.PostConstruct;
import java.sql.SQLException;
import java.util.*;

/**
* @author xuye
* @since 2023/7/12 14:55
**/
@Slf4j
@Service
@RequiredArgsConstructor
public class UpdateService implements ApplicationRunner {

private final HelperProperties helperProperties;

private final HashMap<String, HikariDataSource> dataSourceMap;

@PostConstruct
public void init() {

Map<String, HelperProperties.DataSource> datasourceConfigMap = helperProperties.getDatasourceMap();
for (String key : datasourceConfigMap.keySet()) {
HelperProperties.DataSource source = datasourceConfigMap.get(key);
HikariConfig config = new HikariConfig();
config.setJdbcUrl(source.getUrl());
config.setUsername(source.getUsername());
config.setPassword(source.getPassword());
HikariDataSource dataSource = new HikariDataSource(config);
dataSourceMap.put(key, dataSource);


}
}

public void start() throws SQLException {
String sql = "select date(createDate) as date, spider, count(*) as count\n" +
"from all_news\n" +
"where JSON_EXTRACT(raw_data, '$.crawler') = 'snp'\n" +
" and createDate > curdate() - interval 1 day\n" +
"group by spider, date\n" +
"order by date desc, count desc;";


Set<String> keys = dataSourceMap.keySet();

HashMap<String, List<Entity>> map = new HashMap<>();

for (String key : keys) {
HikariDataSource dataSource = dataSourceMap.get(key);
List<Entity> query = Db.use(dataSourceMap.get(key)).query(sql);
map.put(key, query);
}

// Compare List<Entity> between different environments
for (String key1 : keys) {
for (String key2 : keys) {
if (!key1.equals(key2)) {
List<Entity> list1 = map.get(key1);
List<Entity> list2 = map.get(key2);

for (Entity entity1 : list1) {
for (Entity entity2 : list2) {
String date1 = entity1.getStr("date");
String spider1 = entity1.getStr("spider");
int count1 = entity1.getInt("count");

String date2 = entity2.getStr("date");
String spider2 = entity2.getStr("spider");
int count2 = entity2.getInt("count");

if (date1.equals(date2) && spider1.equals(spider2) && count1 != count2) {
int diff = Math.abs(count1 - count2);
String more = count1 > count2 ? key1 : key2;
System.out.printf("@@@ [%s] [%s] %s: %d %s: %d, diff abs: %d, more: %s%n", date1, spider1, key1, count1, key2, count2, diff, more);
this.findDIff(spider1, date1);
}
}
}
}
}
// todo 此处比较多个的话有遗漏,后续优化下
break;
}
}


public void findDIff(String spider, String date) throws SQLException {
HashMap<String, List<String>> map = new HashMap<>();
for (String key : dataSourceMap.keySet()) {
HikariDataSource dataSource = dataSourceMap.get(key);

String sql = "SELECT url FROM all_news WHERE spider = '" + spider + "' AND date(createDate) = '" + date + "';";
List<Entity> resultList = Db.use(dataSource).query(sql);
ArrayList<String> list = new ArrayList<>();
for (Entity entity : resultList) {
String url = entity.getStr("url");
list.add(url);
}
map.put(key, list);
}

for (String s : map.keySet()) {
this.compareEnvironments(map, s);
}


}

public void compareEnvironments(HashMap<String, List<String>> map, String environment) throws SQLException {

List<String> environmentList = map.get(environment);
if (environmentList == null) {
return;
}
for (String key : map.keySet()) {
if (key.equals(environment)) {
continue;
}
List<String> list = map.getOrDefault(key, new ArrayList<>());
List<String> intersection = new ArrayList<>(list);
intersection.retainAll(environmentList);

List<String> different = new ArrayList<>(list);
different.removeAll(intersection);

if (!different.isEmpty()) {
System.out.println("@@ 环境 " + environment + " 中,List " + key + " 中独有的元素有" + different.size() + "个");
for (String s : different) {
getStatusByUrl(s);
}
}
}
}

public void getStatusByUrl(String url) throws SQLException {
String sql = "select spider from all_news where url='" + url + "';";
for (String key : dataSourceMap.keySet()) {
HikariDataSource dataSource = dataSourceMap.get(key);
List<Entity> resultList = Db.use(dataSource).query(sql);
if (ObjectUtils.isEmpty(resultList)) {
System.err.println("⚠️ url: " + url + " not found in " + key);
}
if (resultList.size() == 1) {
System.out.println("url [" + url + "] 被 [" + resultList.get(0).get("spider") + "] 提前抓取了");
}
if (resultList.size() > 1) {
System.err.println("!!! url: " + url + " found in " + key + " " + resultList.size() + " times");
}

}
}

@Override
public void run(ApplicationArguments args) throws Exception {
this.start();
}
}

+ 15
- 0
helper/src/main/resources/application.yml View File

@@ -0,0 +1,15 @@
knife4j:
enable: true
server:
port: 9300
helper:
datasourceMap:
sit:
url: jdbc:mysql://47.116.58.10:3306/pyspider_sit_resultdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8
username: pyspider_user
password: ND0328qSywfre
prd:
url: jdbc:mysql://47.103.55.230:3306/pyspider_resultdb?characterEncoding=utf8&autoReconnect=true&useUnicode=true&useSSL=false
username: pyspider
password: strzsJQWp%uw9oKB


+ 21
- 0
helper/src/main/resources/bootstrap.yml View File

@@ -0,0 +1,21 @@
server:
port: 18001
spring:
application:
name: helper
cloud:
nacos:
config:
enabled: true
server-addr: https://nacos-sit.deepq.tech
file-extension: yml
namespace: std-news-process-dev
shared-configs:
- data-id: common.yml
refresh: true
discovery:
enabled: ${spring.cloud.nacos.config.enabled}
server-addr: ${spring.cloud.nacos.config.server-addr}
namespace: ${spring.cloud.nacos.config.namespace}
username: nacos
password: SKaIsixbMZ

+ 3
- 9
pom.xml View File

@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
@@ -28,6 +29,7 @@
<module>source</module>
<module>transformer</module>
<module>sink</module>
<module>helper</module>
</modules>

<properties>
@@ -70,14 +72,6 @@
</goals>
</execution>
</executions>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>


+ 13
- 9
scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java View File

@@ -13,9 +13,9 @@ import work.xuye.common.constant.BindingConstants;
import work.xuye.common.constant.MessageConstants;
import work.xuye.common.constant.StageConstants;
import work.xuye.common.db.entity.Task;
import work.xuye.common.db.entity.vo.TaskMiniVO;
import work.xuye.common.db.service.TaskManager;
import work.xuye.common.db.service.TaskService;
import work.xuye.common.dto.HttpRequestParams;
import work.xuye.common.dto.TaskVO;
import work.xuye.common.enums.ProcessMode;
import work.xuye.common.utils.ID;
@@ -41,24 +41,28 @@ public class IssueService implements ApplicationRunner {

public TaskVO issueTask(Integer id) {
TaskVO taskVO = taskManager.getTaskInfoByTaskId(id);
HttpRequestParams requestParams = taskVO.getTask().getRequestParams();
Message<HttpRequestParams> message = MessageBuilder
.withPayload(requestParams)
Task task = taskVO.getTask();
TaskMiniVO taskMiniVO = TaskMiniVO.builder()
.id(task.getId())
.name(task.getName())
.build();
Message<TaskMiniVO> message = MessageBuilder
.withPayload(taskMiniVO)
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SCHEDULER)
.setHeader(MessageConstants.TASK_TRACE_ID, ID.generate())
.setHeader(MessageConstants.TASK_ID, taskVO.getTask().getId())
.setHeader(MessageConstants.TASK_NAME, taskVO.getTask().getName())
.setHeader(MessageConstants.TASK_ID, task.getId())
.setHeader(MessageConstants.TASK_NAME, task.getName())
.build();
boolean send = streamBridge.send(BindingConstants.TASK_OUT, message);
if (!send) {
throw new RuntimeException("send message failed");
}
ProcessMode processMode = taskVO.getTask().getProcessMode();
ProcessMode processMode = task.getProcessMode();
String emoji = processMode.equals(ProcessMode.DEBUG) ? "\uD83E\uDD16" : "";
log.info("\uD83D\uDCE4 {} {}-{}",
emoji,
taskVO.getTask().getId(),
taskVO.getTask().getName());
task.getId(),
task.getName());
return taskVO;
}



+ 2
- 2
scheduler/src/main/resources/bootstrap.yml View File

@@ -5,9 +5,9 @@ spring:
nacos:
config:
enabled: true
server-addr: https://nacos-test.deepq.tech
server-addr: https://nacos-sit.deepq.tech
file-extension: yml
namespace: std-news-process-sit
namespace: std-news-process-dev
shared-configs:
- data-id: common.yml
refresh: true


+ 34
- 7
sink/src/main/java/work/xuye/sink/controller/OpenController.java View File

@@ -1,19 +1,46 @@
package work.xuye.sink.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.*;
import work.xuye.common.properties.CommonProperties;
import work.xuye.sink.yueke.*;

import java.util.List;

/**
* @author xuye
* @since 2023/7/4 15:16
**/
@Slf4j
@Api(tags = "外部请求")
@RestController
@RequestMapping("/open")
@RequestMapping("/open/api")
@RequiredArgsConstructor
public class OpenController {

@GetMapping
public String open() {
return "hello ~";
private final YueKeRevokeService yueKeRevokeService;
private final CommonProperties commonProperties;

@PostMapping("/revoke")
@ApiOperation(value = "阅客资讯-下架")
public YueKeResponse revoke(@RequestBody YueKeRevokeNewsIDsVO ids, @RequestHeader(name = "metaData") YueKeRequestModel headerModel) {
log.info("body: {}, metaData: {}", ids, headerModel);
yueKeRevokeService.handleRevokeRequest(ids, headerModel);
return YueKeResponse.of(YueKeResEnum.COMMON_SUCCESS);
}

@GetMapping("/debug/{id}")
@ApiOperation(value = "测试-阅客资讯-下架")
public YueKeResponse debug(@PathVariable String id) {
if (commonProperties.isProduction()) {
throw new UnsupportedOperationException("生产环境不支持该操作");
}
yueKeRevokeService.revoke(List.of(id));
return YueKeResponse.of(YueKeResEnum.COMMON_SUCCESS);
}


}

+ 10
- 3
sink/src/main/java/work/xuye/sink/handler/DisappearHandler.java View File

@@ -10,6 +10,7 @@ import work.xuye.common.constant.RegexConstants;
import work.xuye.common.db.entity.Sql;
import work.xuye.common.db.entity.Task;
import work.xuye.common.db.entity.vo.SinkParams;
import work.xuye.common.db.entity.vo.TableTemplate;
import work.xuye.common.db.service.SqlManager;
import work.xuye.common.db.service.TaskManager;
import work.xuye.common.dto.TaskVO;
@@ -18,6 +19,7 @@ import work.xuye.common.enums.MessageType;
import work.xuye.common.enums.Status;
import work.xuye.common.properties.AlertProperties;
import work.xuye.common.service.MessageService;
import work.xuye.common.spel.SpringExpressionLanguageEvaluator;
import work.xuye.common.utils.DebugUtil;
import work.xuye.sink.service.JdbcService;
import work.xuye.sink.service.SqlExecutor;
@@ -43,6 +45,7 @@ public class DisappearHandler {
private final SqlManager sqlManager;
private final AlertProperties alertProperties;
private final SqlExecutor sqlExecutor;
private final SpringExpressionLanguageEvaluator evaluator;

public void handle(Message<Set<String>> message) {

@@ -62,11 +65,15 @@ public class DisappearHandler {
return;
}
String condition = delete.getDisappearConfig().getCondition();
String tableName = task.getSinkParams().getTableName();
String dataSourceName = task.getSinkParams().getDataSourceName();

String templateNameExpression = taskVO.getTask().getSinkParams().getTemplateNameExpression();
String templateName = evaluator.evaluate(message.getPayload(), templateNameExpression, String.class).toString();
TableTemplate template = taskManager.getTemplateByName(templateName).getTableTemplate();
String tableName = template.getTableName();
String dataSourceName = template.getDatasourceName();
List<Map<String, Object>> itemList = jdbcService.getForListMap(dataSourceName,
SqlGenerator.generateGivenFieldSql(tableName, condition, taskName));
String uniqueField = taskVO.getTemplate().getTableTemplate().getUniqueField().getFieldName();
String uniqueField = template.getUniqueField().getFieldName();
List<Map<String, Object>> removed = itemList.stream().filter(item -> !message.getPayload().contains(item.get(uniqueField).toString())).collect(Collectors.toList());
if (ObjectUtils.isEmpty(removed)) {
return;


+ 39
- 23
sink/src/main/java/work/xuye/sink/handler/ItemHandler.java View File

@@ -79,10 +79,15 @@ public class ItemHandler {
}
//相关字段放这里
String md5Digest = DigestUtils.md5DigestAsHex(message.getPayload().getBytes());
TableTemplate template = taskVO.getTemplate().getTableTemplate();


String templateNameExpression = taskVO.getTask().getSinkParams().getTemplateNameExpression();
String templateName = evaluator.evaluate(message.getPayload(), templateNameExpression, String.class).toString();
TableTemplate template = taskManager.getTemplateByName(templateName).getTableTemplate();

SinkParams.CheckDelete checkDeleteConfig = taskVO.getTask().getSinkParams().getCheckDelete();
// 根据消息体,和映射关系,使用SpEL表达式,得到计算结果
HashMap<String, Object> expressionResultMap = evaluator.evaluate(taskVO.getMapping().getTableMapping(), message);
HashMap<String, Object> expressionResultMap = evaluator.evaluate(taskVO.getTemplateMappingMap().get(templateName).getTableMapping(), message);
// 如果不能通过校验,则抛出异常
this.validate(expressionResultMap, template);
// 将计算结果合并到模板中,此刻的模板是拥有值的
@@ -116,11 +121,11 @@ public class ItemHandler {
this.putToCache(template, md5Digest);
return;
}
Map<String, Object> dbItem = this.getExistedItem(taskVO, template);
Map<String, Object> dbItem = this.getExistedItem(template);
this.tryUpdate(taskVO, dbItem, expressionResultMap, md5Digest, template, reason);
//如果不存在,就插入
} else {
this.tryInsert(taskVO.getTask(), expressionResultMap, template, reason);
this.tryInsert(message.getPayload(), taskVO, expressionResultMap, template, reason);
}
this.putToCache(template, md5Digest);
}
@@ -166,7 +171,7 @@ public class ItemHandler {
private boolean tryDelete(TaskVO taskVO, TableTemplate template, HashMap<String, Object> newItem, SinkParams.CheckDelete checkDeleteConfig, ProcessReason reason) {
// 如果不是 是固定值检测删除并且状态是开启,说明无需处理,直接返回
if (
!(checkDeleteConfig.getStatus().equals(Status.on) && checkDeleteConfig.getMode().equals(DeleteCheckMode.fixedValue))
!(Status.on.equals(checkDeleteConfig.getStatus()) && DeleteCheckMode.fixedValue.equals(checkDeleteConfig.getMode()))
) {
return false;
}
@@ -175,15 +180,15 @@ public class ItemHandler {
// 满足断言条件,才能继续处理
boolean satisfied = this.isSatisfied(taskVO.getTask(), newItem, template, predicates, "删除");
if (satisfied) {
return doDelete(taskVO, template.getUniqueField(), reason);
return doDelete(taskVO, template, reason);
}
return false;
}

private boolean doDelete(TaskVO taskVO, TableTemplate.Field uniqueField, ProcessReason reason) {
String tableName = taskVO.getTask().getSinkParams().getTableName();
public boolean doDelete(TaskVO taskVO, TableTemplate template, ProcessReason reason) {
TableTemplate.Field uniqueField = template.getUniqueField();
String tableName = template.getTableName();
Object value = uniqueField.getValue();
TableTemplate template = taskVO.getTemplate().getTableTemplate();
TableTemplate.LogicDeleteField logicDeleteField = template.getLogicDeleteField();
boolean isLogicDelete = !ObjectUtils.isEmpty(logicDeleteField);
String deleteSql;
@@ -192,7 +197,7 @@ public class ItemHandler {
} else {
deleteSql = SqlGenerator.generateLogicDeleteUpdateSql(tableName, template);
}
int rows = jdbcService.executeSql(taskVO.getTask().getSinkParams().getDataSourceName(), deleteSql);
int rows = jdbcService.executeSql(template.getDatasourceName(), deleteSql);
log.info("[{}][\uD83D\uDDD1️已{}删除{}条][{} {}] " + "{}: {}",
taskVO.getTask().getName(),
isLogicDelete ? "逻辑" : "",
@@ -201,6 +206,9 @@ public class ItemHandler {
reason.name(),
uniqueField.getFieldName(),
value);
if (ProcessReason.external_condition.equals(reason)) {
return rows > 0;
}
if (rows != 1) {
throw new RuntimeException("期望删除一条,实际删除" + rows + "条数据,请尽快处理");
}
@@ -255,11 +263,6 @@ public class ItemHandler {
field.setValue(value);
}
}

// 以任务的表名为准,覆盖模板的表名
if (!ObjectUtils.isEmpty(taskVO.getTask().getSinkParams().getTableName())) {
template.setTableName(taskVO.getTask().getSinkParams().getTableName());
}
}


@@ -289,12 +292,12 @@ public class ItemHandler {

private boolean itemExist(SinkParams sinkParams, TableTemplate tableTemplate) {
String selectExist = SqlGenerator.generateExistedSql(tableTemplate);
return jdbcService.isExist(sinkParams.getDataSourceName(), selectExist);
return jdbcService.isExist(tableTemplate.getDatasourceName(), selectExist);
}

private Map<String, Object> getExistedItem(TaskVO taskVO, TableTemplate template) {
String dataSourceName = taskVO.getTask().getSinkParams().getDataSourceName();
return jdbcService.getOne(dataSourceName, SqlGenerator.generateFetchSql(template));
private Map<String, Object> getExistedItem(TableTemplate template) {
String datasourceName = template.getDatasourceName();
return jdbcService.getOne(datasourceName, SqlGenerator.generateFetchSql(template));
}


@@ -303,7 +306,7 @@ public class ItemHandler {
String taskName = taskVO.getTask().getName();
boolean updateConfigEnabled = sinkParams.getCheckUpdate().getStatus().equals(Status.on);
if (updateConfigEnabled) {
String dataSourceName = sinkParams.getDataSourceName();
String dataSourceName = tableTemplate.getDatasourceName();
Assert.notNull(dataSourceName, "dataSourceName is null");
SinkParams.CheckUpdate checkUpdateConfig = sinkParams.getCheckUpdate();
String fieldName = checkUpdateConfig.getFieldName();
@@ -349,9 +352,13 @@ public class ItemHandler {
}


private void tryInsert(Task task, Map<String, Object> newItem, TableTemplate template, ProcessReason reason) {
if (this.isSatisfied(task, newItem, template, task.getSinkParams().getInsertConfig().getPredicates(), "保存")) {
jdbcService.update(task.getSinkParams().getDataSourceName(), SqlGenerator.generateInsertSql(template));
private void tryInsert(String message, TaskVO taskVO, Map<String, Object> newItem, TableTemplate template, ProcessReason reason) {
Task task = taskVO.getTask();
// 执行前置检查
if (!this.isSatisfied(task, newItem, template, task.getSinkParams().getInsertConfig().getPredicates(), "保存")) {
return;
} else {
jdbcService.update(template.getDatasourceName(), SqlGenerator.generateInsertSql(template));
log.info("[{}][\uD83D\uDCBE 已保存][{} {}] " + "{}: {}",
task.getName(),
reason.getEmoji(),
@@ -359,6 +366,15 @@ public class ItemHandler {
template.getUniqueField().getFieldName(),
template.getUniqueField().getValue());
}

// 执行后置动作
List<String> postActions = task.getSinkParams().getInsertConfig().getPostActions();
if (ObjectUtils.isEmpty(postActions)) {
return;
}
for (String expression : postActions) {
evaluator.evaluate(message, expression, Void.class);
}
}

}

+ 14
- 0
sink/src/main/java/work/xuye/sink/yueke/YueKeConstants.java View File

@@ -0,0 +1,14 @@
package work.xuye.sink.yueke;

/**
* @author xuye
* @since 2023/7/10 09:56
**/
public class YueKeConstants {

//请求时间验证3分钟有效(毫秒)
public static final Long VALID_MS = 1000 * 60 * 3L;
public static final Integer REQ_ID_MIN_LENGTH = 10;
public static final Integer REQ_ID_MAX_LENGTH = 50;

}

+ 23
- 0
sink/src/main/java/work/xuye/sink/yueke/YueKeException.java View File

@@ -0,0 +1,23 @@
package work.xuye.sink.yueke;

import lombok.Data;
import lombok.EqualsAndHashCode;

/**
* @author xuye
* @since 2023/7/10 10:19
**/
@Data
@EqualsAndHashCode(callSuper = true)
public class YueKeException extends RuntimeException {

private String requestId;

private YueKeResEnum yueKeResEnum;


public YueKeException(String requestId, YueKeResEnum yueKeResEnum) {
this.requestId = requestId;
this.yueKeResEnum = yueKeResEnum;
}
}

+ 20
- 0
sink/src/main/java/work/xuye/sink/yueke/YueKeExceptionControllerAdvice.java View File

@@ -0,0 +1,20 @@
package work.xuye.sink.yueke;

import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;

/**
* @author xuye
* @since 2022/5/17 18:26
**/
@Slf4j
@RestControllerAdvice
public class YueKeExceptionControllerAdvice {

@ExceptionHandler(YueKeException.class)
public YueKeResponse handleThrowable(YueKeException e) {
return YueKeResponse.of(e.getYueKeResEnum());
}

}

+ 23
- 0
sink/src/main/java/work/xuye/sink/yueke/YueKeRequestJsonConverter.java View File

@@ -0,0 +1,23 @@
package work.xuye.sink.yueke;

import com.google.gson.Gson;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull;
import org.springframework.core.convert.converter.Converter;
import org.springframework.stereotype.Component;

/**
* @author xuye
* @since 2023/7/10 18:06
**/
@Component
@RequiredArgsConstructor
public class YueKeRequestJsonConverter implements Converter<String, YueKeRequestModel> {

private final Gson gson;

@Override
public YueKeRequestModel convert(@NotNull String jsonSource) {
return gson.fromJson(jsonSource, YueKeRequestModel.class);
}
}

+ 18
- 0
sink/src/main/java/work/xuye/sink/yueke/YueKeRequestModel.java View File

@@ -0,0 +1,18 @@
package work.xuye.sink.yueke;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* @author xuye
* @since 2023/7/10 17:57
**/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class YueKeRequestModel {
private String reqId;
private Long timestamp;
private String sign;
}

+ 54
- 0
sink/src/main/java/work/xuye/sink/yueke/YueKeResEnum.java View File

@@ -0,0 +1,54 @@
package work.xuye.sink.yueke;

import java.io.Serializable;

/**
* @author xuye
* @since 2023/7/10 09:57
**/
public enum YueKeResEnum implements Serializable {

//一般成功,
COMMON_SUCCESS("10000", "success"),
//一般失败
COMMON_ERROR("50000", "error"),

//11000开头的是权限验证用
SECURITY_ERROR_11001("11001", "metadata is empty"),
SECURITY_ERROR_11002("11002", "some params in metadata is empty"),
SECURITY_ERROR_11003("11003", "timestamp valid time is " + YueKeConstants.VALID_MS / 1000 / 60 + " minutes"),

SECURITY_ERROR_11006("11006", "sign is not valid"),
SECURITY_ERROR_11007("11007", "reqId is too short, min length is " + YueKeConstants.REQ_ID_MIN_LENGTH),
SECURITY_ERROR_11008("11008", "reqId is too long, max length is " + YueKeConstants.REQ_ID_MAX_LENGTH),

SECURITY_ERROR_11009("11009", "refer is empty"),
SECURITY_ERROR_11010("11010", "refer is not valid"),

SECURITY_ERROR_11501("11501", "other error");


private final String code;
private final String msg;

YueKeResEnum(String code, String msg) {
this.code = code;
this.msg = msg;
}

public String getCode() {
return code;
}

public String getMsg() {
return msg;
}

@Override
public String toString() {
return "APIResponse{" +
"code='" + code + '\'' +
", msg='" + msg + '\'' +
'}';
}
}

+ 33
- 0
sink/src/main/java/work/xuye/sink/yueke/YueKeResponse.java View File

@@ -0,0 +1,33 @@
package work.xuye.sink.yueke;

import lombok.Data;

import java.util.UUID;

/**
* @author xuye
* @since 2023/7/10 09:59
**/
@Data
public class YueKeResponse {

private String requestId;
private Long requestStartTime;
private Long requestEndTime;
private String code;
private String msg;

public YueKeResponse() {
this.requestId = UUID.randomUUID().toString().replaceAll("-", "");
this.requestStartTime = System.currentTimeMillis();
}

public static YueKeResponse of(YueKeResEnum yueKeResEnum) {
YueKeResponse response = new YueKeResponse();
response.setCode(yueKeResEnum.getCode());
response.setMsg(yueKeResEnum.getMsg());
response.setRequestEndTime(System.currentTimeMillis());
return response;
}

}

+ 25
- 0
sink/src/main/java/work/xuye/sink/yueke/YueKeRevokeNewsIDsVO.java View File

@@ -0,0 +1,25 @@
package work.xuye.sink.yueke;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.ArrayList;
import java.util.List;

/**
* @author xuye
* @since 2023/7/10 17:50
**/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class YueKeRevokeNewsIDsVO {

private Data data = new Data();

@lombok.Data
public static class Data {
private final List<String> messageIdList = new ArrayList<>();
}
}

+ 147
- 0
sink/src/main/java/work/xuye/sink/yueke/YueKeRevokeService.java View File

@@ -0,0 +1,147 @@
package work.xuye.sink.yueke;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import work.xuye.common.db.entity.Mapping;
import work.xuye.common.db.entity.Task;
import work.xuye.common.db.entity.Template;
import work.xuye.common.db.entity.vo.TableTemplate;
import work.xuye.common.db.service.TaskManager;
import work.xuye.common.dto.TaskVO;
import work.xuye.common.enums.ProcessMode;
import work.xuye.common.enums.ProcessReason;
import work.xuye.common.properties.YueKeProperties;
import work.xuye.common.store.NsKVMapStore;
import work.xuye.sink.handler.ItemHandler;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
* @author xuye
* @since 2023/7/10 10:15
**/
@Slf4j
@Service
@RequiredArgsConstructor
public class YueKeRevokeService {

private final YueKeProperties yueKeProperties;
private final TaskManager taskManager;
private final NsKVMapStore mapStore;
private final ItemHandler itemHandler;


private static String getSign(Long timestamp, String reqId, String secret) {
String params = timestamp + reqId + secret;
return DigestUtils.md5Hex(params);
}

public void handleRevokeRequest(YueKeRevokeNewsIDsVO ids, YueKeRequestModel requestModel) {
// 根据沟通,阅客对同一篇资讯会下发两次id,一次是普通的id,一次是带有:的id,这里需要过滤掉带:的id
if (this.afterClean(ids).isEmpty()) {
log.info("revoke ids is empty, ignore");
return;
}
log.info("待下架的资讯id列表: {}", ids);
// 确保是阅客的请求
this.checkRequest(requestModel);
log.info("请求合法,开始下架");
// 执行下架
this.revoke(ids.getData().getMessageIdList());
}

public void revoke(List<String> idList) {
// 前提条件:阅客的任务名包含 yueke,并且只有一条记录
Task task = taskManager.getTaskService().lambdaQuery()
.eq(Task::getName, yueKeProperties.getTaskName())
.eq(Task::getProcessMode, ProcessMode.NORMAL)
.select(Task::getId)
.orderBy(true, true, Task::getCreateTime)
.last("limit 1")
.one();
if (ObjectUtils.isEmpty(task)) {
log.warn("接收到阅客的下架请求,但是库中没有此任务,跳过处理");
return;
}
TaskVO taskVO = taskManager.getTaskInfoByTaskId(task.getId());

for (String id : idList) {
String url = mapStore.getValue(yueKeProperties.getNamespaceOfIdUrlMap(), id);
if (ObjectUtils.isEmpty(url)) {
log.warn("接收到阅客的下架请求,但是库中没有此记录,跳过处理");
continue;
}
List<Template> templateList = new ArrayList<>();
for (String templateName : taskVO.getTemplateMappingMap().values().stream().map(Mapping::getTemplate).collect(Collectors.toList())) {
Template templateByName = taskManager.getTemplateByName(templateName);
templateList.add(templateByName);
}
for (Template template : templateList) {
TableTemplate tableTemplate = template.getTableTemplate();
tableTemplate.getUniqueField().setValue(url);
if (itemHandler.doDelete(taskVO, tableTemplate, ProcessReason.external_condition)) {
log.info("阅客下架成功,资讯id: {}, url: {}", id, url);
} else {
log.info("阅客下架失败,资讯id: {}, url: {}", id, url);
}
}
}
}

private List<String> afterClean(YueKeRevokeNewsIDsVO idsVO) {
List<String> messageIdList = idsVO.getData().getMessageIdList();
messageIdList.removeAll(
messageIdList.stream().filter(id -> id.contains(":")).collect(Collectors.toList())
);
return messageIdList;
}

private void checkRequest(YueKeRequestModel requestModel) {

try {
if (ObjectUtils.isEmpty(requestModel)) {
throw new YueKeException(null, YueKeResEnum.SECURITY_ERROR_11001);
}
Long requestTimeStamp = requestModel.getTimestamp();
String requestId = requestModel.getReqId();
String requestSign = requestModel.getSign();

//参数完整性校验
if (ObjectUtils.isEmpty(requestId) || ObjectUtils.isEmpty(requestSign) || null == requestTimeStamp) {
throw new YueKeException(requestId, YueKeResEnum.SECURITY_ERROR_11002);
}
int requestIdLength = requestId.length();
if (requestIdLength < YueKeConstants.REQ_ID_MIN_LENGTH) {
throw new YueKeException(requestId, YueKeResEnum.SECURITY_ERROR_11007);
} else if (requestIdLength > YueKeConstants.REQ_ID_MAX_LENGTH) {
throw new YueKeException(requestId, YueKeResEnum.SECURITY_ERROR_11008);
}
//时间有效期验证
Long nowTime = System.currentTimeMillis();
long minValidTime = nowTime - YueKeConstants.VALID_MS;
long maxValidTime = nowTime + YueKeConstants.VALID_MS;
if (!(requestTimeStamp >= minValidTime && requestTimeStamp <= maxValidTime)) {
throw new YueKeException(requestId, YueKeResEnum.SECURITY_ERROR_11003);
}
//sign验证
String sign = getSign(requestTimeStamp, requestId, yueKeProperties.getSecret());
if (!requestSign.equals(sign)) {
throw new YueKeException(requestId, YueKeResEnum.SECURITY_ERROR_11006);
}
} catch (YueKeException yueKeException) {
log.warn("check revoke param error: ", yueKeException);
throw yueKeException;
} catch (Exception ex) {
log.error("other exception : {}", ex.getMessage(), ex);
throw new YueKeException(null, YueKeResEnum.SECURITY_ERROR_11501);
}

}


}

+ 2
- 2
sink/src/main/resources/bootstrap.yml View File

@@ -5,9 +5,9 @@ spring:
nacos:
config:
enabled: true
server-addr: https://nacos-test.deepq.tech
server-addr: https://nacos-sit.deepq.tech
file-extension: yml
namespace: std-news-process-sit
namespace: std-news-process-dev
shared-configs:
- data-id: common.yml
refresh: true


+ 114
- 59
source/src/main/java/work/xuye/source/handler/SourceHandler.java View File

@@ -3,27 +3,32 @@ package work.xuye.source.handler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.DigestUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StopWatch;
import work.xuye.common.constant.*;
import work.xuye.common.db.entity.Task;
import work.xuye.common.db.entity.vo.TaskMiniVO;
import work.xuye.common.db.service.TaskManager;
import work.xuye.common.dto.HttpRequestParams;
import work.xuye.common.dto.HttpRes;
import work.xuye.common.dto.TaskVO;
import work.xuye.common.enums.ProcessMode;
import work.xuye.common.enums.RequestMode;
import work.xuye.common.enums.ResourceStatus;
import work.xuye.common.service.UrlMD5Service;
import work.xuye.common.utils.DebugUtil;
import work.xuye.common.utils.ID;
import work.xuye.source.request.RequestClient;
import work.xuye.source.spel.CustomRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;

@@ -36,80 +41,130 @@ import java.util.regex.Matcher;
@RequiredArgsConstructor
public class SourceHandler {

public final CustomRequest cr;
private final SpelExpressionParser parser;
private final TaskManager taskManager;
private final RequestClient requestClient;
private final UrlMD5Service urlMD5Service;
private final StreamBridge streamBridge;
private final TaskManager taskManager;


public Message<HttpRes> handle(Message<HttpRequestParams> message) {
TaskVO taskVO = taskManager.getTaskInfoByTaskId((Integer) message.getHeaders().get(MessageConstants.TASK_ID));
String taskName = taskVO.getTask().getName();
if (DebugUtil.isSkip(taskVO)) {
public List<Message<HttpRes>> handle(Message<TaskMiniVO> message) {
TaskMiniVO taskMiniVO = message.getPayload();
if (DebugUtil.isSkip(taskMiniVO)) {
return null;
}
Task task = taskManager.getTaskInfoByTaskId(taskMiniVO.getId()).getTask();
if (DebugUtil.isSkip(task)) {
return null;
}
HttpRequestParams req = task.getRequestParams();
String taskName = task.getName();
HttpRes res = null;
HttpRequestParams requestParams = message.getPayload();
StopWatch stopWatch = new StopWatch();
try {
stopWatch.start();
res = this.request(requestParams);
stopWatch.stop();
String md5Digest = DigestUtils.md5DigestAsHex(res.getBody().getBytes());
ResourceStatus resourceStatus = urlMD5Service.getResourceStatus(requestParams.getUrl(), md5Digest);
urlMD5Service.put(requestParams.getUrl(), md5Digest);
res.setResourceStatus(resourceStatus);
} catch (Exception e) {
log.error("[{}] request error, request params:[{}], error message:[{}]", taskName, requestParams, e.getMessage());
List<HttpRes> result = null;
if (RequestMode.normal.equals(req.getMode())) {
res = this.request(req);
result = this.handleRes(task, res);
} else if (RequestMode.SpEL.equals(req.getMode())) {
StandardEvaluationContext context = new StandardEvaluationContext();
Expression expression = parser.parseExpression(task.getRequestParams().getSpELConfig().getExpression());
List<HttpRes> value = expression.getValue(context, this, List.class);
result = this.handleRes(task, value);
}
if (ObjectUtils.isEmpty(result)) {
return null;
}
if (!res.getStatus().is2xxSuccessful()) {
log.warn("[{}] response status code is not 2xx, request params: [{}], response: [{}]", taskName, requestParams, res);
throw new RuntimeException("[" + taskName + "]" + "unexpected status code: " + res.getStatus());
}
ProcessMode processMode = taskManager
.getTaskInfoByTaskId(
(Integer) message.getHeaders().get(MessageConstants.TASK_ID))
.getTask()
.getProcessMode();
this.log(message, res, processMode, stopWatch.getLastTaskTimeMillis(), taskVO.getTask());
if (res.getResourceStatus().equals(ResourceStatus.UNCHANGED) && !ProcessMode.DEBUG.equals(processMode)) {
if (ProcessMode.WATCH.equals(task.getProcessMode())) {
for (HttpRes r : result) {
Message<String> watchMessage = MessageBuilder
.withPayload(r.getBody())
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE_WATCH)
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID))
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate())
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID))
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME))
.setHeader(SnapshotConstants.URL, req.getUrl())
.setHeader(SnapshotConstants.RESOURCE_STATUS, r.getResourceStatus())
.setHeader(SnapshotConstants.STATUS, r.getStatus())
.build();

boolean send = streamBridge.send(taskName + BindingConstants.SNAPSHOT_OUT_POSTFIX, watchMessage);
if (!send) {
throw new RuntimeException("send message failed");
}
}
return null;
}

if (ProcessMode.WATCH.equals(processMode)) {
Message<String> watchMessage = MessageBuilder
.withPayload(res.getBody())
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE_WATCH)
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID))
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate())
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID))
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME))
.setHeader(SnapshotConstants.URL, requestParams.getUrl())
.setHeader(SnapshotConstants.RESOURCE_STATUS, res.getResourceStatus())
.setHeader(SnapshotConstants.STATUS, res.getStatus())
.build();

boolean send = streamBridge.send(taskName + BindingConstants.SNAPSHOT_OUT_POSTFIX, watchMessage);
if (!send) {
throw new RuntimeException("send message failed");
ArrayList<Message<HttpRes>> messageList = new ArrayList<>();
result.forEach(
item -> {
messageList.add(MessageBuilder
.withPayload(item)
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE)
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID))
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate())
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID))
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME))
.setHeader(MessageConstants.SEED_URL, item.getRequestParams().getUrl())
.build());
}
);
return messageList;


}


private void setResourceStatus(HttpRes res) {
String md5Digest = DigestUtils.md5DigestAsHex(res.getBody().getBytes());
ResourceStatus resourceStatus = urlMD5Service.getResourceStatus(res.getRequestParams().getUrl(), md5Digest);
urlMD5Service.put(res.getRequestParams().getUrl(), md5Digest);
res.setResourceStatus(resourceStatus);
}


private List<HttpRes> handleRes(Task task, List<HttpRes> resList) {
if (ObjectUtils.isEmpty(resList)) {
return null;
}
List<HttpRes> out = new ArrayList<>();
for (HttpRes res : resList) {
if (!res.getStatus().is2xxSuccessful()) {
log.warn("request failed, url: {}, status: {}", res.getRequestParams().getUrl(), res.getStatus());
continue;
}
this.setResourceStatus(res);
this.log(res, task);
if (res.getResourceStatus() != ResourceStatus.UNCHANGED || ProcessMode.DEBUG == task.getProcessMode()) {
out.add(res);
}
}
return out;

}

private List<HttpRes> handleRes(Task task, HttpRes res) {
List<HttpRes> out = new ArrayList<>();
if (!res.getStatus().is2xxSuccessful()) {
log.info("log for debug:{}", res);
log.warn("request failed, url: {}, status: {}", res.getRequestParams().getUrl(), res.getStatus());
return null;
}
return MessageBuilder
.withPayload(res)
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE_WATCH)
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID))
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate())
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID))
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME))
.setHeader(MessageConstants.SEED_URL, requestParams.getUrl())
.build();
this.setResourceStatus(res);
this.log(res, task);
if (res.getResourceStatus().equals(ResourceStatus.UNCHANGED) && !ProcessMode.DEBUG.equals(task.getProcessMode())) {
return null;
} else {
out.add(res);
return out;
}
}

private void log(Message<HttpRequestParams> message, HttpRes res, ProcessMode processMode, long ms, Task task) {
HttpRequestParams req = message.getPayload();

private void log(HttpRes res, Task task) {
ProcessMode processMode = task.getProcessMode();
HttpRequestParams req = res.getRequestParams();
String emoji = "";
if (processMode.equals(ProcessMode.NORMAL)) {
emoji = "";
@@ -121,7 +176,7 @@ public class SourceHandler {
if (res.getStatus().is2xxSuccessful()) {
log.info("[{}-{}] [{}{} {}] [{} {} {}ms] {} ", task.getId(), task.getName(),
res.getResourceStatus().getEmoji(), emoji,
res.getResourceStatus().getName(), req.getMethod(), res.getStatus(), ms, req.getUrl());
res.getResourceStatus().getName(), req.getMethod(), res.getStatus(), res.getCostTimeMillis(), req.getUrl());
} else {
log.warn("@@ method: [{}], status: [{}], url: [{}], request: [{}], response: [{}]",
req.getMethod(), res.getStatus(), req.getUrl(), req.getBody(), res.getBody());


+ 3
- 2
source/src/main/java/work/xuye/source/processor/MessageConsumer.java View File

@@ -5,10 +5,11 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import work.xuye.common.dto.HttpRequestParams;
import work.xuye.common.db.entity.vo.TaskMiniVO;
import work.xuye.common.dto.HttpRes;
import work.xuye.source.handler.SourceHandler;

import java.util.List;
import java.util.function.Function;

/**
@@ -23,7 +24,7 @@ public class MessageConsumer {
private final SourceHandler sourceHandler;

@Bean
public Function<Message<HttpRequestParams>, Message<HttpRes>> source() {
public Function<Message<TaskMiniVO>, List<Message<HttpRes>>> source() {
return sourceHandler::handle;
}



+ 23
- 11
source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java View File

@@ -10,12 +10,13 @@ import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StopWatch;
import work.xuye.common.dto.HttpRequestParams;
import work.xuye.common.dto.HttpRes;
import work.xuye.source.properties.SourceProperties;
import work.xuye.source.util.CharsetUtil;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -53,6 +54,8 @@ public class OkHttpRequestClient implements RequestClient {

@Override
public HttpRes execute(HttpRequestParams request) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
Request.Builder requestBuilder = new Request.Builder();
// request url
requestBuilder.url(request.getUrl());
@@ -67,7 +70,9 @@ public class OkHttpRequestClient implements RequestClient {
String type = request.getMediaType();
RequestBody requestBody = null;
//如果不是GET请求,并且请求体不为空才能构建请求体
if (!request.getMethod().equalsIgnoreCase(HttpMethod.GET.name()) && !ObjectUtils.isEmpty(request.getBody().keySet())) {
if (!request.getMethod().equalsIgnoreCase(HttpMethod.GET.name())
&& !ObjectUtils.isEmpty(request.getBody())
&& !ObjectUtils.isEmpty(request.getBody().keySet())) {
// form url
if (org.springframework.http.MediaType.APPLICATION_FORM_URLENCODED_VALUE.equals(type)) {
FormBody.Builder formBodyBuilder = new FormBody.Builder();
@@ -82,28 +87,35 @@ public class OkHttpRequestClient implements RequestClient {
throw new RuntimeException("暂不支持的请求类型");
}
}
if (!ObjectUtils.isEmpty(requestBody)) {
requestBuilder.method(request.getMethod(), requestBody);

//如果非get请求,并且请求体仍然为空,需要构建一个空的请求体,因为okhttp不支持null请求体
if (!request.getMethod().equalsIgnoreCase(HttpMethod.GET.name()) && ObjectUtils.isEmpty(requestBody)) {
requestBody = new FormBody.Builder().build();
}
requestBuilder.method(request.getMethod(), requestBody);


// res
HttpRes httpRes = new HttpRes();
Request httpRequest = requestBuilder.build();
try (Response response = client.newCall(httpRequest).execute()) {
httpRes.setUrl(request.getUrl());
httpRes.setStatus(HttpStatus.valueOf(response.code()));
httpRes.requestParams(request).headers(response.headers().toMultimap()).status(HttpStatus.valueOf(response.code()));
if (!ObjectUtils.isEmpty(response.body())) {
BufferedSource source = response.body().source();
Charset charset = CharsetUtil.getCharsetByName(request.getCharset());
String responseBody = source.readByteString().string(charset);
httpRes.setBody(responseBody);
httpRes.body(responseBody);
source.close();
}
httpRes.headers(response.headers().toMultimap());
} catch (IOException e) {
throw new RuntimeException(e);
} catch (SocketTimeoutException e) {
httpRes.status(HttpStatus.REQUEST_TIMEOUT);
} catch (Exception e) {
log.error("execute request error+", e);
httpRes.status(HttpStatus.INTERNAL_SERVER_ERROR);
}
return httpRes;
stopWatch.stop();
return httpRes.costTimeMillis(stopWatch.getTotalTimeMillis());

}

}

+ 25
- 0
source/src/main/java/work/xuye/source/spel/CustomRequest.java View File

@@ -0,0 +1,25 @@
package work.xuye.source.spel;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import work.xuye.common.dto.HttpRes;

import java.util.List;

/**
* @author xuye
* @since 2023/3/8 12:36
**/
@Slf4j
@Component
@RequiredArgsConstructor
public class CustomRequest {

private final YueKeRequest yueKeRequest;

public List<HttpRes> yueKe(String channelUrlPrefix, String listPageUrlPrefix, String env, String appId, String appSecret, boolean dup) {
return yueKeRequest.run(channelUrlPrefix, listPageUrlPrefix, env, appId, appSecret, dup);
}

}

+ 111
- 0
source/src/main/java/work/xuye/source/spel/YueKeRequest.java View File

@@ -0,0 +1,111 @@
package work.xuye.source.spel;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.minidev.json.JSONObject;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.stereotype.Component;
import org.springframework.web.util.UriComponentsBuilder;
import work.xuye.common.dto.HttpRequestParams;
import work.xuye.common.dto.HttpRes;
import work.xuye.common.properties.YueKeProperties;
import work.xuye.common.store.NsKVMapStore;
import work.xuye.source.request.RequestClient;

import java.util.*;

/**
* @author xuye
* @since 2023/7/7 16:29
**/
@Slf4j
@Component
@RequiredArgsConstructor
public class YueKeRequest {


private final YueKeProperties yueKeProperties;
private final RequestClient requestClient;
private final NsKVMapStore mapStore;

private final Map<Integer, String> resCodeMap = new HashMap<>();

{
resCodeMap.put(50000, "error");
resCodeMap.put(11001, "metadata is empty");
resCodeMap.put(11002, "some params in metadata is empty");
resCodeMap.put(11003, "timestamp valid time is 3 min");
resCodeMap.put(11004, "reqId is used");
resCodeMap.put(11006, "sign is not valid");
resCodeMap.put(11007, "reqId is too short, min length is 10");
resCodeMap.put(11008, "reqId is too long, max length is 50");
resCodeMap.put(11501, "other error");
}

public List<HttpRes> run(String channelUrlPrefix, String listPageUrlPrefix, String env, String appId, String appSecret, boolean dup) {
List<HttpRes> resList = new ArrayList<>();
String channelUrl = UriComponentsBuilder.
fromUriString(channelUrlPrefix)
.queryParam("appId", appId).toUriString();
HashMap<String, String> channelMap = new HashMap<>();
JsonParser.parseString(this.get(channelUrl, appId, appSecret).getBody())
.getAsJsonObject().get("data").getAsJsonArray()
.forEach(
item -> {
String channelId = item.getAsJsonObject().get("id").getAsString();
String channelName = item.getAsJsonObject().get("name").getAsString();
channelMap.put(channelId, channelName);
String value = mapStore.getValue(yueKeProperties.getNamespaceOfChannelIdNameMap(), channelId);
if (!channelName.equals(value)) {
mapStore.upsert(yueKeProperties.getNamespaceOfChannelIdNameMap(), channelId, channelName);
}
}
);
log.info("获取频道列表成功,频道列表:{}", channelMap);

for (String channelId : channelMap.keySet()) {

String url = UriComponentsBuilder.fromUriString(listPageUrlPrefix)
.queryParam("appId", appId)
.queryParam("channelId", channelId)
.queryParam("userId", env)
.queryParam("pageSize", 20)
.queryParam("dup", dup)
.toUriString();
HttpRes httpRes = this.get(url, appId, appSecret);
httpRes.getRequestParams().setUrl(url);
resList.add(httpRes);
log.info("获取频道[{}]下的资讯成功", channelMap.get(channelId));
}
return resList;
}

private HttpRes get(String url, String appId, String appSecret) {
String authKey = this.getAuthKey(appId, appSecret);
HttpRequestParams requestParams = new HttpRequestParams();
requestParams.setUrl(url);
requestParams.setHeaders(Map.of("metadata", authKey));
HttpRes httpRes = requestClient.execute(requestParams);
JsonObject object = JsonParser.parseString(httpRes.getBody()).getAsJsonObject();
int code = object.get("code").getAsInt();
if (code != 10000) {
throw new RuntimeException("请求,失败原因:" + resCodeMap.get(code));
}
return httpRes;
}

private String getAuthKey(String appId, String appSecret) {
Long timestamp_req = System.currentTimeMillis();
String reqId_req = UUID.randomUUID().toString().replaceAll("-", "");
String unencryptedParams = appId + timestamp_req + reqId_req + appSecret;
String sign = DigestUtils.md5Hex(unencryptedParams);
JSONObject jsonObj = new JSONObject();
jsonObj.put("appId", appId);
jsonObj.put("reqId", reqId_req);
jsonObj.put("timestamp", timestamp_req);
jsonObj.put("sign", sign);
return jsonObj.toJSONString();
}
}

+ 2
- 2
source/src/main/resources/bootstrap.yml View File

@@ -7,9 +7,9 @@ spring:
nacos:
config:
enabled: true
server-addr: https://nacos-test.deepq.tech
server-addr: https://nacos-sit.deepq.tech
file-extension: yml
namespace: std-news-process-sit
namespace: std-news-process-dev
shared-configs:
- data-id: common.yml
refresh: true


+ 1
- 3
transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java View File

@@ -196,9 +196,7 @@ public class TransformHandler {
.build();
results.add(itemMessage);
});
if (results.size() > 0) {
log.info("@ [{}][{}{}] 下发转换后的消息,数量:{}", taskVO.getTask().getName(), processReason.getEmoji(), processReason, results.size());
}
log.info("@ [{}][{}{}] 下发转换后的消息,数量:{}", taskVO.getTask().getName(), processReason.getEmoji(), processReason, results.size());
} else {
log.warn("@@ 任务[{}]没有配置itemsPath", taskVO.getTask().getName());
}


+ 2
- 2
transformer/src/main/resources/bootstrap.yml View File

@@ -5,9 +5,9 @@ spring:
nacos:
config:
enabled: true
server-addr: https://nacos-test.deepq.tech
server-addr: https://nacos-sit.deepq.tech
file-extension: yml
namespace: std-news-process-sit
namespace: std-news-process-dev
shared-configs:
- data-id: common.yml
refresh: true


Loading…
Cancel
Save