From 060b85167bc1a9e20dc8df496e8b56fbc72fb3be Mon Sep 17 00:00:00 2001 From: xuye Date: Fri, 7 Jul 2023 15:07:17 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=85=E5=AE=A2=E8=B5=84=E8=AE=AF=E6=8E=A5?= =?UTF-8?q?=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../xuye/common/config/ContainerCustomizer.java | 4 + .../work/xuye/common/constant/CacheConstants.java | 3 +- .../work/xuye/common/constant/CommonConstants.java | 11 ++ .../java/work/xuye/common/db/entity/Mapping.java | 3 + .../work/xuye/common/db/entity/vo/SinkParams.java | 21 ++- .../xuye/common/db/entity/vo/TableTemplate.java | 2 + .../work/xuye/common/db/entity/vo/TaskMiniVO.java | 21 +++ .../work/xuye/common/db/service/StoreManager.java | 18 +++ .../work/xuye/common/db/service/TaskManager.java | 37 +++-- .../work/xuye/common/dto/HttpRequestParams.java | 17 +- .../main/java/work/xuye/common/dto/HttpRes.java | 12 +- .../src/main/java/work/xuye/common/dto/TaskVO.java | 7 +- .../java/work/xuye/common/enums/ProcessReason.java | 5 + .../java/work/xuye/common/enums/RequestMode.java | 18 +++ .../xuye/common/properties/CommonProperties.java | 22 +++ .../xuye/common/properties/YueKeProperties.java | 38 +++++ .../java/work/xuye/common/spel/CustomFunction.java | 80 ++++++++-- .../spel/SpringExpressionLanguageEvaluator.java | 3 +- .../xuye/common/store/LocalMemoryNsKVMapStore.java | 74 --------- .../xuye/common/store/MySQLCachedNsKVMapStore.java | 7 +- .../java/work/xuye/common/store/NsKVMapStore.java | 4 + .../java/work/xuye/common/utils/DebugUtil.java | 44 +++++- .../java/work/xuye/common/utils/JsonPathUtil.java | 8 +- helper/.gitignore | 33 ++++ helper/pom.xml | 50 ++++++ .../java/work/xuye/helper/HelperApplication.java | 16 ++ .../xuye/helper/properties/HelperProperties.java | 28 ++++ .../work/xuye/helper/service/UpdateService.java | 171 ++++++++++++++++++++ helper/src/main/resources/application.yml | 15 ++ helper/src/main/resources/bootstrap.yml | 21 +++ pom.xml | 12 +- .../work/xuye/scheduler/service/IssueService.java | 22 +-- scheduler/src/main/resources/bootstrap.yml | 4 +- .../work/xuye/sink/controller/OpenController.java | 41 ++++- .../work/xuye/sink/handler/DisappearHandler.java | 13 +- .../java/work/xuye/sink/handler/ItemHandler.java | 62 +++++--- .../java/work/xuye/sink/yueke/YueKeConstants.java | 14 ++ .../java/work/xuye/sink/yueke/YueKeException.java | 23 +++ .../sink/yueke/YueKeExceptionControllerAdvice.java | 20 +++ .../xuye/sink/yueke/YueKeRequestJsonConverter.java | 23 +++ .../work/xuye/sink/yueke/YueKeRequestModel.java | 18 +++ .../java/work/xuye/sink/yueke/YueKeResEnum.java | 54 +++++++ .../java/work/xuye/sink/yueke/YueKeResponse.java | 33 ++++ .../work/xuye/sink/yueke/YueKeRevokeNewsIDsVO.java | 25 +++ .../work/xuye/sink/yueke/YueKeRevokeService.java | 147 +++++++++++++++++ sink/src/main/resources/bootstrap.yml | 4 +- .../work/xuye/source/handler/SourceHandler.java | 173 ++++++++++++++------- .../xuye/source/processor/MessageConsumer.java | 5 +- .../xuye/source/request/OkHttpRequestClient.java | 34 ++-- .../java/work/xuye/source/spel/CustomRequest.java | 25 +++ .../java/work/xuye/source/spel/YueKeRequest.java | 111 +++++++++++++ source/src/main/resources/bootstrap.yml | 4 +- .../xuye/transformer/handler/TransformHandler.java | 4 +- transformer/src/main/resources/bootstrap.yml | 4 +- 54 files changed, 1400 insertions(+), 268 deletions(-) create mode 100644 common/src/main/java/work/xuye/common/constant/CommonConstants.java create mode 100644 common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java create mode 100644 common/src/main/java/work/xuye/common/enums/RequestMode.java create mode 100644 common/src/main/java/work/xuye/common/properties/CommonProperties.java create mode 100644 common/src/main/java/work/xuye/common/properties/YueKeProperties.java delete mode 100644 common/src/main/java/work/xuye/common/store/LocalMemoryNsKVMapStore.java create mode 100644 helper/.gitignore create mode 100644 helper/pom.xml create mode 100644 helper/src/main/java/work/xuye/helper/HelperApplication.java create mode 100644 helper/src/main/java/work/xuye/helper/properties/HelperProperties.java create mode 100644 helper/src/main/java/work/xuye/helper/service/UpdateService.java create mode 100644 helper/src/main/resources/application.yml create mode 100644 helper/src/main/resources/bootstrap.yml create mode 100644 sink/src/main/java/work/xuye/sink/yueke/YueKeConstants.java create mode 100644 sink/src/main/java/work/xuye/sink/yueke/YueKeException.java create mode 100644 sink/src/main/java/work/xuye/sink/yueke/YueKeExceptionControllerAdvice.java create mode 100644 sink/src/main/java/work/xuye/sink/yueke/YueKeRequestJsonConverter.java create mode 100644 sink/src/main/java/work/xuye/sink/yueke/YueKeRequestModel.java create mode 100644 sink/src/main/java/work/xuye/sink/yueke/YueKeResEnum.java create mode 100644 sink/src/main/java/work/xuye/sink/yueke/YueKeResponse.java create mode 100644 sink/src/main/java/work/xuye/sink/yueke/YueKeRevokeNewsIDsVO.java create mode 100644 sink/src/main/java/work/xuye/sink/yueke/YueKeRevokeService.java create mode 100644 source/src/main/java/work/xuye/source/spel/CustomRequest.java create mode 100644 source/src/main/java/work/xuye/source/spel/YueKeRequest.java diff --git a/common/src/main/java/work/xuye/common/config/ContainerCustomizer.java b/common/src/main/java/work/xuye/common/config/ContainerCustomizer.java index ac5f57c..24fb07a 100644 --- a/common/src/main/java/work/xuye/common/config/ContainerCustomizer.java +++ b/common/src/main/java/work/xuye/common/config/ContainerCustomizer.java @@ -1,5 +1,7 @@ package work.xuye.common.config; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.cloud.stream.config.ListenerContainerCustomizer; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.listener.AbstractMessageListenerContainer; @@ -10,7 +12,9 @@ import org.springframework.util.backoff.FixedBackOff; * @author xuye * @since 2023/3/15 23:41 **/ +@Slf4j @Configuration +@ConditionalOnClass(ListenerContainerCustomizer.class) public class ContainerCustomizer implements ListenerContainerCustomizer> { @Override public void configure(AbstractMessageListenerContainer container, String destinationName, String group) { diff --git a/common/src/main/java/work/xuye/common/constant/CacheConstants.java b/common/src/main/java/work/xuye/common/constant/CacheConstants.java index befb3b1..6611c47 100644 --- a/common/src/main/java/work/xuye/common/constant/CacheConstants.java +++ b/common/src/main/java/work/xuye/common/constant/CacheConstants.java @@ -9,5 +9,6 @@ public class CacheConstants { public static final String TASK = "snp_task"; public static final String STORE = "snp_store"; - + + public static final String TEMPLATE = "snp_template"; } diff --git a/common/src/main/java/work/xuye/common/constant/CommonConstants.java b/common/src/main/java/work/xuye/common/constant/CommonConstants.java new file mode 100644 index 0000000..3bfe39c --- /dev/null +++ b/common/src/main/java/work/xuye/common/constant/CommonConstants.java @@ -0,0 +1,11 @@ +package work.xuye.common.constant; + +/** + * @author xuye + * @since 2023/7/11 16:05 + **/ +public class CommonConstants { + + public static final String SEPARATOR = ":"; + +} diff --git a/common/src/main/java/work/xuye/common/db/entity/Mapping.java b/common/src/main/java/work/xuye/common/db/entity/Mapping.java index 26fefe7..c3e43f0 100644 --- a/common/src/main/java/work/xuye/common/db/entity/Mapping.java +++ b/common/src/main/java/work/xuye/common/db/entity/Mapping.java @@ -36,6 +36,9 @@ public class Mapping implements Serializable { @TableField("name") private String name; + @TableField("template") + private String template; + @TableField(value = "table_mapping", typeHandler = JacksonTypeHandler.class) private MessageMapping tableMapping; diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java b/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java index 70c8f87..86482d3 100644 --- a/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java +++ b/common/src/main/java/work/xuye/common/db/entity/vo/SinkParams.java @@ -18,9 +18,9 @@ import java.util.List; public class SinkParams { /** - * 模板名称 + * 模板名称表达式 */ - private String templateName; + private String templateNameExpression; /** * 根据哪个字段来判断这条数据是否需要执行update */ @@ -29,21 +29,20 @@ public class SinkParams { * 来判断这条数据是否需要执行delete */ private CheckDelete checkDelete; - /** - * 数据源名字 - */ - private String dataSourceName; - - /** - * 表名 - */ - private String tableName; private InsertConfig insertConfig; @Data public static class InsertConfig { + /** + * 前置检查 + */ private List predicates; + + /** + * 后置行为 + */ + private List postActions; } @Data diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java b/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java index 7c20924..2601fe1 100644 --- a/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java +++ b/common/src/main/java/work/xuye/common/db/entity/vo/TableTemplate.java @@ -20,6 +20,8 @@ import java.util.List; public class TableTemplate { private String tableName; + private String datasourceName; + /** * 指的是数据库有默认值或者自增的字段,如果有,则在执行insert语句时,不需要填充该字段 diff --git a/common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java b/common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java new file mode 100644 index 0000000..50e2b26 --- /dev/null +++ b/common/src/main/java/work/xuye/common/db/entity/vo/TaskMiniVO.java @@ -0,0 +1,21 @@ +package work.xuye.common.db.entity.vo; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author xuye + * @since 2023/7/7 21:58 + **/ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class TaskMiniVO { + + private Integer id; + + private String name; +} diff --git a/common/src/main/java/work/xuye/common/db/service/StoreManager.java b/common/src/main/java/work/xuye/common/db/service/StoreManager.java index d104a85..f2be4ea 100644 --- a/common/src/main/java/work/xuye/common/db/service/StoreManager.java +++ b/common/src/main/java/work/xuye/common/db/service/StoreManager.java @@ -3,6 +3,7 @@ package work.xuye.common.db.service; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.cache.annotation.CacheEvict; import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Service; import work.xuye.common.constant.CacheConstants; @@ -28,4 +29,21 @@ public class StoreManager { .one(); } + + @CacheEvict(value = CacheConstants.STORE, key = "#namespace + ':' + #key") + public Store upsert(String namespace, String key, String value) { + Store store = this.getValueFromNsByKey(namespace, key); + if (store == null) { + store = new Store(); + store.setNamespace(namespace); + store.setKey(key); + store.setValue(value); + storeService.save(store); + } else { + store.setValue(value); + storeService.updateById(store); + } + return store; + } + } diff --git a/common/src/main/java/work/xuye/common/db/service/TaskManager.java b/common/src/main/java/work/xuye/common/db/service/TaskManager.java index 06677c5..64db9d2 100644 --- a/common/src/main/java/work/xuye/common/db/service/TaskManager.java +++ b/common/src/main/java/work/xuye/common/db/service/TaskManager.java @@ -1,5 +1,6 @@ package work.xuye.common.db.service; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.cache.annotation.Cacheable; @@ -11,6 +12,11 @@ import work.xuye.common.db.entity.Task; import work.xuye.common.db.entity.Template; import work.xuye.common.db.entity.vo.SinkParams; import work.xuye.common.dto.TaskVO; +import work.xuye.common.spel.CustomFunction; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; /** * @author xuye @@ -21,11 +27,13 @@ import work.xuye.common.dto.TaskVO; @RequiredArgsConstructor public class TaskManager { + public final CustomFunction cf; + + @Getter private final TaskService taskService; private final TemplateService templateService; private final MappingService mappingService; - @Cacheable(value = CacheConstants.TASK, key = "#taskId", unless = "#result == null") public TaskVO getTaskInfoByTaskId(Integer taskId) { Task task = taskService.lambdaQuery() @@ -35,23 +43,24 @@ public class TaskManager { SinkParams sinkParams = task.getSinkParams(); if (sinkParams != null) { - String templateName = sinkParams.getTemplateName(); - - Template template = templateService.lambdaQuery() - .eq(Template::getName, templateName) - .one(); - Assert.notNull(template, "模板" + templateName + "不存在"); - Mapping mapping = mappingService.lambdaQuery() + List mappingList = mappingService.lambdaQuery() .eq(Mapping::getName, task.getName()) - .one(); - Assert.notNull(mapping, "映射" + task.getName() + "不存在"); - - return new TaskVO(task, template, mapping); + .list(); + Assert.notEmpty(mappingList, "mapping" + task.getName() + "不存在"); + Map map = mappingList.stream().collect(Collectors.toMap(Mapping::getTemplate, mapping -> mapping)); + return new TaskVO(task, map); } else { - return new TaskVO(task, null, null); + return new TaskVO(task, null); } - } + @Cacheable(value = CacheConstants.TEMPLATE, key = "#templateName", unless = "#result == null") + public Template getTemplateByName(String templateName) { + Template template = templateService.lambdaQuery() + .eq(Template::getName, templateName) + .one(); + Assert.notNull(template, "模板" + templateName + "不存在"); + return template; + } } diff --git a/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java b/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java index e5a8acf..bfc66ff 100644 --- a/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java +++ b/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java @@ -5,6 +5,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; +import work.xuye.common.enums.RequestMode; import java.util.Map; @@ -17,18 +18,22 @@ import java.util.Map; @AllArgsConstructor public class HttpRequestParams { + private RequestMode mode = RequestMode.normal; + private SpELConfig spELConfig; private String method = HttpMethod.GET.name(); - private String url; - private Map body = Map.of(); - private String charset; - private Map headers = Map.of(); - private Map placeholderExpressions = Map.of(); - private String mediaType = MediaType.APPLICATION_JSON_VALUE; + @Data + @NoArgsConstructor + @AllArgsConstructor + public static class SpELConfig { + private String expression; + + } + } diff --git a/common/src/main/java/work/xuye/common/dto/HttpRes.java b/common/src/main/java/work/xuye/common/dto/HttpRes.java index f04cc6c..e2f8c1d 100644 --- a/common/src/main/java/work/xuye/common/dto/HttpRes.java +++ b/common/src/main/java/work/xuye/common/dto/HttpRes.java @@ -18,12 +18,12 @@ import java.util.Map; @AllArgsConstructor public class HttpRes { - private String url; - + private HttpRequestParams requestParams; private ResourceStatus resourceStatus; private HttpStatus status = HttpStatus.OK; private String body; private Map> headers = Map.of(); + private Long costTimeMillis; public static HttpRes build() { return new HttpRes(); @@ -54,10 +54,14 @@ public class HttpRes { return this; } - public HttpRes url(String url) { - this.url = url; + public HttpRes costTimeMillis(Long costTimeMillis) { + this.costTimeMillis = costTimeMillis; return this; } + public HttpRes requestParams(HttpRequestParams requestParams) { + this.requestParams = requestParams; + return this; + } } diff --git a/common/src/main/java/work/xuye/common/dto/TaskVO.java b/common/src/main/java/work/xuye/common/dto/TaskVO.java index f75432c..ebec53b 100644 --- a/common/src/main/java/work/xuye/common/dto/TaskVO.java +++ b/common/src/main/java/work/xuye/common/dto/TaskVO.java @@ -6,7 +6,8 @@ import lombok.Data; import lombok.NoArgsConstructor; import work.xuye.common.db.entity.Mapping; import work.xuye.common.db.entity.Task; -import work.xuye.common.db.entity.Template; + +import java.util.Map; /** * @author xuye @@ -20,8 +21,6 @@ public class TaskVO { private Task task; - private Template template; - - private Mapping mapping; + private Map templateMappingMap; } diff --git a/common/src/main/java/work/xuye/common/enums/ProcessReason.java b/common/src/main/java/work/xuye/common/enums/ProcessReason.java index b6189e8..a1b9144 100644 --- a/common/src/main/java/work/xuye/common/enums/ProcessReason.java +++ b/common/src/main/java/work/xuye/common/enums/ProcessReason.java @@ -26,6 +26,11 @@ public enum ProcessReason { resource_changed("\uD83D\uDD04", "缓存中存在该记录,但与最新状态不一致"), /** + * 外部条件满足 + */ + external_condition("✅⚠️", "外部条件引发处理"), + + /** * 结束执行 */ end("\uD83D\uDD1A", "结束执行"), diff --git a/common/src/main/java/work/xuye/common/enums/RequestMode.java b/common/src/main/java/work/xuye/common/enums/RequestMode.java new file mode 100644 index 0000000..d864cc8 --- /dev/null +++ b/common/src/main/java/work/xuye/common/enums/RequestMode.java @@ -0,0 +1,18 @@ +package work.xuye.common.enums; + +/** + * @author xuye + * @since 2023/3/9 16:24 + **/ +public enum RequestMode { + + /** + * 常规的根据参数发起http请求 + */ + normal, + + /** + * 执行表达式 + */ + SpEL +} diff --git a/common/src/main/java/work/xuye/common/properties/CommonProperties.java b/common/src/main/java/work/xuye/common/properties/CommonProperties.java new file mode 100644 index 0000000..726036f --- /dev/null +++ b/common/src/main/java/work/xuye/common/properties/CommonProperties.java @@ -0,0 +1,22 @@ +package work.xuye.common.properties; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.stereotype.Component; + +/** + * @author xuye + * @since 2023/3/6 17:35 + **/ +@Data +@Component +@RefreshScope +@ConfigurationProperties(prefix = CommonProperties.PROPERTIES_PREFIX) +public class CommonProperties { + + public static final String PROPERTIES_PREFIX = "common"; + + private boolean production = true; + +} diff --git a/common/src/main/java/work/xuye/common/properties/YueKeProperties.java b/common/src/main/java/work/xuye/common/properties/YueKeProperties.java new file mode 100644 index 0000000..f4d8a48 --- /dev/null +++ b/common/src/main/java/work/xuye/common/properties/YueKeProperties.java @@ -0,0 +1,38 @@ +package work.xuye.common.properties; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.context.config.annotation.RefreshScope; +import org.springframework.stereotype.Component; +import work.xuye.common.constant.CommonConstants; + +/** + * @author xuye + * @since 2023/3/6 17:35 + **/ +@Data +@Component +@RefreshScope +@ConfigurationProperties(prefix = YueKeProperties.PROPERTIES_PREFIX) +public class YueKeProperties { + + public static final String PROPERTIES_PREFIX = "yueke"; + + + private String taskName = "yueke_json"; + private String idUrlMapName = "id_url_map"; + private String channelIdNameMap = "channel_id_name_map"; + + private String secret = "Tk1xa4xNDPj8atGD5eGeXad91xQhKY"; + + + public String getNamespaceOfIdUrlMap() { + return this.taskName + CommonConstants.SEPARATOR + this.idUrlMapName; + } + + public String getNamespaceOfChannelIdNameMap() { + return this.taskName + CommonConstants.SEPARATOR + this.channelIdNameMap; + } + + +} diff --git a/common/src/main/java/work/xuye/common/spel/CustomFunction.java b/common/src/main/java/work/xuye/common/spel/CustomFunction.java index 3e9a7d1..43e09f0 100644 --- a/common/src/main/java/work/xuye/common/spel/CustomFunction.java +++ b/common/src/main/java/work/xuye/common/spel/CustomFunction.java @@ -10,11 +10,14 @@ import org.springframework.expression.Expression; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.stereotype.Component; import org.springframework.util.DigestUtils; +import org.springframework.util.MultiValueMap; import org.springframework.util.ObjectUtils; +import org.springframework.web.util.UriComponentsBuilder; import work.xuye.common.constant.CustomerDateTimeFormatter; import work.xuye.common.constant.RawDataFiledKey; import work.xuye.common.store.NsKVMapStore; import work.xuye.common.utils.HttpUtil; +import work.xuye.common.utils.JsonPathUtil; import java.time.Instant; import java.time.LocalDateTime; @@ -36,7 +39,7 @@ public class CustomFunction { private final SpelExpressionParser parser; - private final NsKVMapStore nsKvMapStore; + private final NsKVMapStore mapStore; public String jsonExtract(String json, String path) { String result = null; @@ -140,7 +143,6 @@ public class CustomFunction { return (String) exp.getValue(); } - public String warpExclude(String json, String excludeFiled, String... fields) { JsonObject obj = JsonParser.parseString(json).getAsJsonObject(); if (obj.has(excludeFiled)) { @@ -174,7 +176,12 @@ public class CustomFunction { for (int i = 0; i < fields.length; i += 2) { String key = fields[i]; String path = fields[i + 1]; - String value = this.jsonExtract(json, path); + String value; + if (JsonPathUtil.isJsonPath(path)) { + value = this.jsonExtract(json, path); + } else { + value = path; + } result.add(key, new JsonPrimitive(value)); log.debug("wrap handler, json: [{}], key: [{}], path: [{}], value: [{}]", json, key, path, value); } @@ -190,7 +197,6 @@ public class CustomFunction { return HttpUtil.isHttpRes2XX(url); } - public boolean urlNotContains(String url, String contains) { return !url.contains(contains); } @@ -199,17 +205,44 @@ public class CustomFunction { * 根据key从命名空间中获取value * * @param namespace 任务名 - * @param json item的json字符串 - * @param path key的jsonpath + * @param key key * @return value */ - public String getValueByKeyFromNs(String namespace, String json, String path) { - String key = this.jsonExtract(json, path); - String value = nsKvMapStore.getValue(namespace, key); + public String getValueByKeyFromNs(String namespace, String key) { + String value = mapStore.getValue(namespace, key); log.debug("getValueByKeyFromNs handler, namespace: [{}], key-in: [{}], value-out: [{}]", namespace, key, value); return value; } + public String getValueByKeyFromNs(String namespace, String json, String jsonPath) { + String key = this.jsonExtract(json, jsonPath); + return getValueByKeyFromNs(namespace, key); + } + + + public void putValueToNs(String namespace, String key, String value) { + mapStore.upsert(namespace, key, value); + log.debug("putValueToNs handler, namespace: [{}], key: [{}], value: [{}]", namespace, key, value); + } + + public void putValueToNs(String namespace, String json, String jsonPath, String value) { + String key = this.jsonExtract(json, jsonPath); + putValueToNs(namespace, key, value); + } + + public String getParamsFromUrl(String json, String jsonPath, String key) { + String url = this.jsonExtract(json, jsonPath); + return getParamsFromUrl(url, key); + } + + public String getParamsFromUrl(String url, String key) { + UriComponentsBuilder builder = UriComponentsBuilder.fromHttpUrl(url); + MultiValueMap queryParams = builder.build().getQueryParams(); + String value = queryParams.getFirst(key); + log.debug("getParamsFromUrl handler, url: [{}], key-in: [{}], value-out: [{}]", url, key, value); + return value; + } + /** * 先从map中获取指定的key,然后使用jsonpath提取json中的数据,判断是否等于期望值 @@ -222,13 +255,27 @@ public class CustomFunction { */ public boolean equals(Map map, String mapFieldKey, String jsonPath, Object value) { Object fieldValue = map.get(mapFieldKey); - log.debug("equals handler2, map: [{}], mapFieldKey: [{}], mapValue: [{}]", map, mapFieldKey, value); + log.debug("equals handler, map: [{}], mapFieldKey: [{}], mapValue: [{}]", map, mapFieldKey, value); String result = this.jsonExtract(fieldValue.toString(), jsonPath); boolean equals = result.equals(value); - log.debug("equals handler1, json: [{}], jsonPath: [{}], value: [{}], result: [{}]", fieldValue, jsonPath, value, equals); + log.debug("equals handler, json: [{}], jsonPath: [{}], value: [{}], result: [{}]", fieldValue, jsonPath, value, equals); return equals; } + public boolean keyNotExistInNs(Map map, String field, String jsonPath, String namespace) { + return !keyExistInNs(map, field, jsonPath, namespace); + } + + public boolean keyExistInNs(Map map, String field, String jsonPath, String namespace) { + Object o = map.get(field); + String v = this.jsonExtract(o.toString(), jsonPath); + String queryResult = mapStore.getValue(namespace, v); + boolean has = !ObjectUtils.isEmpty(queryResult); + log.info("keyExistInNs handler, value: [{}], namespace: [{}], result: [{}]", v, namespace, has); + return has; + } + + /** * 固定返回 true,用于希望断言通过时使用 * @@ -270,7 +317,6 @@ public class CustomFunction { return array.toString(); } - public String epochMilliToLocalDatetime(String json, String path) { String epochMilli = this.jsonExtract(json, path); String localDatetime = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(epochMilli)), ZoneId.of("Asia/Shanghai")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); @@ -279,4 +325,14 @@ public class CustomFunction { } + public String ifEqualsIgnoreCase(String json, String jsonpath, String dest, String ifValue, String elseValue) { + String value = this.jsonExtract(json, jsonpath); + if (value.equalsIgnoreCase(dest)) { + return ifValue; + } else { + return elseValue; + } + } + + } diff --git a/common/src/main/java/work/xuye/common/spel/SpringExpressionLanguageEvaluator.java b/common/src/main/java/work/xuye/common/spel/SpringExpressionLanguageEvaluator.java index 5b3200f..4ec1ca0 100644 --- a/common/src/main/java/work/xuye/common/spel/SpringExpressionLanguageEvaluator.java +++ b/common/src/main/java/work/xuye/common/spel/SpringExpressionLanguageEvaluator.java @@ -22,11 +22,12 @@ import java.util.List; @Service @RequiredArgsConstructor public class SpringExpressionLanguageEvaluator { - + public final CustomFunction cf; private final SpelExpressionParser parser; + public HashMap evaluate(MessageMapping messageMapping, Message message) { StandardEvaluationContext context = new StandardEvaluationContext(); String seedUrl = (String) message.getHeaders().get(MessageConstants.SEED_URL); diff --git a/common/src/main/java/work/xuye/common/store/LocalMemoryNsKVMapStore.java b/common/src/main/java/work/xuye/common/store/LocalMemoryNsKVMapStore.java deleted file mode 100644 index 8e6110f..0000000 --- a/common/src/main/java/work/xuye/common/store/LocalMemoryNsKVMapStore.java +++ /dev/null @@ -1,74 +0,0 @@ -package work.xuye.common.store; - -import org.springframework.stereotype.Component; - -import javax.annotation.PostConstruct; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * @author xuye - * @since 2023/3/27 11:02 - **/ -@Component -public class LocalMemoryNsKVMapStore implements NsKVMapStore { - - private final static String GLH_V2 = "glh_v2_rss"; - - /** - * key: 命名空间,默认以任务名命名 - * value: key:key - * value:value:value - */ - private Map> store = null; - - - @PostConstruct - public void init() { - if (store != null) { - store.clear(); - } else { - store = new ConcurrentHashMap<>(); - } - store.put(GLH_V2, initGLH()); - } - - public Map initGLH() { - HashMap map = new HashMap<>(); - - map.put("港股", "格隆汇-快讯"); - map.put("A股", "格隆汇-快讯"); - map.put("美股", "格隆汇-快讯"); - map.put("商品外汇", "格隆汇-快讯"); - map.put("楼市", "格隆汇-快讯"); - map.put("基⾦", "格隆汇-快讯"); - map.put("债券", "格隆汇-快讯"); - - map.put("港股公告摘要", "格隆汇-要闻"); - map.put("⼤⾏评级", "格隆汇-要闻"); - map.put("业绩直击", "格隆汇-要闻"); - map.put("港股异动", "格隆汇-要闻"); - map.put("公司信息", "格隆汇-要闻"); - map.put("市场综述", "格隆汇-要闻"); - map.put("新股速递", "格隆汇-要闻"); - map.put("A股公告摘要", "格隆汇-要闻"); - map.put("美股异动", "格隆汇-要闻"); - - return map; - } - - - @Override - public String getValue(String namespace, String key) { - Map map = store.get(namespace); - if (map == null) { - throw new RuntimeException("namespace [" + namespace + "] not found in local store"); - } - Object value = map.getOrDefault(key, null); - if (value == null) { - return null; - } - return value.toString(); - } -} diff --git a/common/src/main/java/work/xuye/common/store/MySQLCachedNsKVMapStore.java b/common/src/main/java/work/xuye/common/store/MySQLCachedNsKVMapStore.java index 5f506ec..7807083 100644 --- a/common/src/main/java/work/xuye/common/store/MySQLCachedNsKVMapStore.java +++ b/common/src/main/java/work/xuye/common/store/MySQLCachedNsKVMapStore.java @@ -22,9 +22,14 @@ public class MySQLCachedNsKVMapStore implements NsKVMapStore { public String getValue(String namespace, String key) { Store store = storeManager.getValueFromNsByKey(namespace, key); if (ObjectUtils.isEmpty(store)) { - throw new RuntimeException("namespace [" + namespace + "] not found in db store"); + return null; } return store.getValue(); } + @Override + public void upsert(String namespace, String key, String value) { + storeManager.upsert(namespace, key, value); + } + } diff --git a/common/src/main/java/work/xuye/common/store/NsKVMapStore.java b/common/src/main/java/work/xuye/common/store/NsKVMapStore.java index 20742b4..4c9c4a0 100644 --- a/common/src/main/java/work/xuye/common/store/NsKVMapStore.java +++ b/common/src/main/java/work/xuye/common/store/NsKVMapStore.java @@ -6,5 +6,9 @@ package work.xuye.common.store; **/ public interface NsKVMapStore { + String getValue(String namespace, String key); + + void upsert(String namespace, String key, String value); + } diff --git a/common/src/main/java/work/xuye/common/utils/DebugUtil.java b/common/src/main/java/work/xuye/common/utils/DebugUtil.java index 7e248c7..fc10481 100644 --- a/common/src/main/java/work/xuye/common/utils/DebugUtil.java +++ b/common/src/main/java/work/xuye/common/utils/DebugUtil.java @@ -1,7 +1,12 @@ package work.xuye.common.utils; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import work.xuye.common.db.entity.Task; +import work.xuye.common.db.entity.vo.TaskMiniVO; import work.xuye.common.dto.TaskVO; +import work.xuye.common.properties.CommonProperties; import java.util.Set; @@ -10,22 +15,51 @@ import java.util.Set; * @since 2023/5/6 10:31 **/ @Slf4j +@Component public class DebugUtil { - private static final boolean DEBUG = false; + private static final Set taskIds = Set.of(17); + private static boolean DEBUG = false; - private static final Set taskIds = Set.of(5); - public static boolean isSkip(TaskVO task) { + @Autowired + public DebugUtil(CommonProperties commonProperties) { + if (commonProperties.isProduction()) { + DEBUG = false; + } + } + + public static boolean isSkip(TaskMiniVO taskMiniVO) { + return isSkip(taskMiniVO.getId(), taskMiniVO.getName()); + } + + public static boolean isSkip(Task task) { + return isSkip(task.getId(), task.getName()); + } + + public static boolean isSkip(TaskVO taskVO) { + return isSkip(taskVO.getTask().getId(), taskVO.getTask().getName()); + } + + public static boolean isSkip(Integer taskId) { if (!DEBUG) { return false; } - Integer taskId = task.getTask().getId(); if (!taskIds.contains(taskId)) { - log.warn("skip task, [{}-{}]", taskId, task.getTask().getName()); + log.warn("skip task, [{}]", taskId); return true; } return false; } + public static boolean isSkip(Integer taskId, String taskName) { + if (!DEBUG) { + return false; + } + if (!taskIds.contains(taskId)) { + log.warn("skip task, [{}-{}]", taskId, taskName); + return true; + } + return false; + } } diff --git a/common/src/main/java/work/xuye/common/utils/JsonPathUtil.java b/common/src/main/java/work/xuye/common/utils/JsonPathUtil.java index 0385f93..35325de 100644 --- a/common/src/main/java/work/xuye/common/utils/JsonPathUtil.java +++ b/common/src/main/java/work/xuye/common/utils/JsonPathUtil.java @@ -32,7 +32,7 @@ public class JsonPathUtil { public static JsonArray extractAsJsonArray(String json, String path, String seedUrl) { boolean exist = JsonPathUtil.exist(json, path); if (!exist) { - log.warn("jsonPath: [{}] from [{}] not exist in json:\n{}", path, seedUrl, json); + log.debug("jsonPath: [{}] from [{}] not exist in json:\n{}", path, seedUrl, json); return new JsonArray(); } Object read = JsonPathUtil.read(json, path); @@ -57,6 +57,12 @@ public class JsonPathUtil { return read; } + public static boolean isJsonPath(String expression) { + // todo 后续可优化成正则判断 + return expression.startsWith("$"); + } + + @Autowired public void setGson(Gson gson) { JsonPathUtil.gson = gson; diff --git a/helper/.gitignore b/helper/.gitignore new file mode 100644 index 0000000..549e00a --- /dev/null +++ b/helper/.gitignore @@ -0,0 +1,33 @@ +HELP.md +target/ +!.mvn/wrapper/maven-wrapper.jar +!**/src/main/**/target/ +!**/src/test/**/target/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ +build/ +!**/src/main/**/build/ +!**/src/test/**/build/ + +### VS Code ### +.vscode/ diff --git a/helper/pom.xml b/helper/pom.xml new file mode 100644 index 0000000..41857d7 --- /dev/null +++ b/helper/pom.xml @@ -0,0 +1,50 @@ + + + 4.0.0 + + work.xuye + std-news-process + 0.0.13-SNAPSHOT + + work.xuye + helper + 0.0.1-SNAPSHOT + helper + helper + + 11 + + + + work.xuye + common + 0.0.13-SNAPSHOT + + + org.springframework.cloud + spring-cloud-stream-binder-kafka + + + + + org.springframework + spring-messaging + + + cn.hutool + hutool-all + 5.8.11 + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/helper/src/main/java/work/xuye/helper/HelperApplication.java b/helper/src/main/java/work/xuye/helper/HelperApplication.java new file mode 100644 index 0000000..4a03640 --- /dev/null +++ b/helper/src/main/java/work/xuye/helper/HelperApplication.java @@ -0,0 +1,16 @@ +package work.xuye.helper; + +import org.mybatis.spring.annotation.MapperScan; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@MapperScan(basePackages = "work.xuye.common.db.mapper") + +@SpringBootApplication(scanBasePackages = {"work.xuye.*"}) +public class HelperApplication { + + public static void main(String[] args) { + SpringApplication.run(HelperApplication.class, args); + } + +} diff --git a/helper/src/main/java/work/xuye/helper/properties/HelperProperties.java b/helper/src/main/java/work/xuye/helper/properties/HelperProperties.java new file mode 100644 index 0000000..e91b50e --- /dev/null +++ b/helper/src/main/java/work/xuye/helper/properties/HelperProperties.java @@ -0,0 +1,28 @@ +package work.xuye.helper.properties; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +import java.util.Map; + +/** + * @author xuye + * @since 2023/1/14 21:50 + **/ +@Data +@Component +@ConfigurationProperties(prefix = HelperProperties.PROPERTIES_PREFIX) +public class HelperProperties { + + public static final String PROPERTIES_PREFIX = "helper"; + + private Map datasourceMap; + + @Data + public static class DataSource { + private String url; + private String username; + private String password; + } +} diff --git a/helper/src/main/java/work/xuye/helper/service/UpdateService.java b/helper/src/main/java/work/xuye/helper/service/UpdateService.java new file mode 100644 index 0000000..5f3b138 --- /dev/null +++ b/helper/src/main/java/work/xuye/helper/service/UpdateService.java @@ -0,0 +1,171 @@ +package work.xuye.helper.service; + +import cn.hutool.db.Db; +import cn.hutool.db.Entity; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; +import work.xuye.helper.properties.HelperProperties; + +import javax.annotation.PostConstruct; +import java.sql.SQLException; +import java.util.*; + +/** + * @author xuye + * @since 2023/7/12 14:55 + **/ +@Slf4j +@Service +@RequiredArgsConstructor +public class UpdateService implements ApplicationRunner { + + private final HelperProperties helperProperties; + + private final HashMap dataSourceMap; + + @PostConstruct + public void init() { + + Map datasourceConfigMap = helperProperties.getDatasourceMap(); + for (String key : datasourceConfigMap.keySet()) { + HelperProperties.DataSource source = datasourceConfigMap.get(key); + HikariConfig config = new HikariConfig(); + config.setJdbcUrl(source.getUrl()); + config.setUsername(source.getUsername()); + config.setPassword(source.getPassword()); + HikariDataSource dataSource = new HikariDataSource(config); + dataSourceMap.put(key, dataSource); + + + } + } + + public void start() throws SQLException { + String sql = "select date(createDate) as date, spider, count(*) as count\n" + + "from all_news\n" + + "where JSON_EXTRACT(raw_data, '$.crawler') = 'snp'\n" + + " and createDate > curdate() - interval 1 day\n" + + "group by spider, date\n" + + "order by date desc, count desc;"; + + + Set keys = dataSourceMap.keySet(); + + HashMap> map = new HashMap<>(); + + for (String key : keys) { + HikariDataSource dataSource = dataSourceMap.get(key); + List query = Db.use(dataSourceMap.get(key)).query(sql); + map.put(key, query); + } + + // Compare List between different environments + for (String key1 : keys) { + for (String key2 : keys) { + if (!key1.equals(key2)) { + List list1 = map.get(key1); + List list2 = map.get(key2); + + for (Entity entity1 : list1) { + for (Entity entity2 : list2) { + String date1 = entity1.getStr("date"); + String spider1 = entity1.getStr("spider"); + int count1 = entity1.getInt("count"); + + String date2 = entity2.getStr("date"); + String spider2 = entity2.getStr("spider"); + int count2 = entity2.getInt("count"); + + if (date1.equals(date2) && spider1.equals(spider2) && count1 != count2) { + int diff = Math.abs(count1 - count2); + String more = count1 > count2 ? key1 : key2; + System.out.printf("@@@ [%s] [%s] %s: %d %s: %d, diff abs: %d, more: %s%n", date1, spider1, key1, count1, key2, count2, diff, more); + this.findDIff(spider1, date1); + } + } + } + } + } + // todo 此处比较多个的话有遗漏,后续优化下 + break; + } + } + + + public void findDIff(String spider, String date) throws SQLException { + HashMap> map = new HashMap<>(); + for (String key : dataSourceMap.keySet()) { + HikariDataSource dataSource = dataSourceMap.get(key); + + String sql = "SELECT url FROM all_news WHERE spider = '" + spider + "' AND date(createDate) = '" + date + "';"; + List resultList = Db.use(dataSource).query(sql); + ArrayList list = new ArrayList<>(); + for (Entity entity : resultList) { + String url = entity.getStr("url"); + list.add(url); + } + map.put(key, list); + } + + for (String s : map.keySet()) { + this.compareEnvironments(map, s); + } + + + } + + public void compareEnvironments(HashMap> map, String environment) throws SQLException { + + List environmentList = map.get(environment); + if (environmentList == null) { + return; + } + for (String key : map.keySet()) { + if (key.equals(environment)) { + continue; + } + List list = map.getOrDefault(key, new ArrayList<>()); + List intersection = new ArrayList<>(list); + intersection.retainAll(environmentList); + + List different = new ArrayList<>(list); + different.removeAll(intersection); + + if (!different.isEmpty()) { + System.out.println("@@ 环境 " + environment + " 中,List " + key + " 中独有的元素有" + different.size() + "个"); + for (String s : different) { + getStatusByUrl(s); + } + } + } + } + + public void getStatusByUrl(String url) throws SQLException { + String sql = "select spider from all_news where url='" + url + "';"; + for (String key : dataSourceMap.keySet()) { + HikariDataSource dataSource = dataSourceMap.get(key); + List resultList = Db.use(dataSource).query(sql); + if (ObjectUtils.isEmpty(resultList)) { + System.err.println("⚠️ url: " + url + " not found in " + key); + } + if (resultList.size() == 1) { + System.out.println("url [" + url + "] 被 [" + resultList.get(0).get("spider") + "] 提前抓取了"); + } + if (resultList.size() > 1) { + System.err.println("!!! url: " + url + " found in " + key + " " + resultList.size() + " times"); + } + + } + } + + @Override + public void run(ApplicationArguments args) throws Exception { + this.start(); + } +} diff --git a/helper/src/main/resources/application.yml b/helper/src/main/resources/application.yml new file mode 100644 index 0000000..30d5b4d --- /dev/null +++ b/helper/src/main/resources/application.yml @@ -0,0 +1,15 @@ +knife4j: + enable: true +server: + port: 9300 +helper: + datasourceMap: + sit: + url: jdbc:mysql://47.116.58.10:3306/pyspider_sit_resultdb?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8 + username: pyspider_user + password: ND0328qSywfre + prd: + url: jdbc:mysql://47.103.55.230:3306/pyspider_resultdb?characterEncoding=utf8&autoReconnect=true&useUnicode=true&useSSL=false + username: pyspider + password: strzsJQWp%uw9oKB + diff --git a/helper/src/main/resources/bootstrap.yml b/helper/src/main/resources/bootstrap.yml new file mode 100644 index 0000000..9c761cf --- /dev/null +++ b/helper/src/main/resources/bootstrap.yml @@ -0,0 +1,21 @@ +server: + port: 18001 +spring: + application: + name: helper + cloud: + nacos: + config: + enabled: true + server-addr: https://nacos-sit.deepq.tech + file-extension: yml + namespace: std-news-process-dev + shared-configs: + - data-id: common.yml + refresh: true + discovery: + enabled: ${spring.cloud.nacos.config.enabled} + server-addr: ${spring.cloud.nacos.config.server-addr} + namespace: ${spring.cloud.nacos.config.namespace} + username: nacos + password: SKaIsixbMZ \ No newline at end of file diff --git a/pom.xml b/pom.xml index a2f4875..adaf53d 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 @@ -28,6 +29,7 @@ source transformer sink + helper @@ -70,14 +72,6 @@ - - - - org.projectlombok - lombok - - - diff --git a/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java b/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java index cd7be70..04e6c4b 100644 --- a/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java +++ b/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java @@ -13,9 +13,9 @@ import work.xuye.common.constant.BindingConstants; import work.xuye.common.constant.MessageConstants; import work.xuye.common.constant.StageConstants; import work.xuye.common.db.entity.Task; +import work.xuye.common.db.entity.vo.TaskMiniVO; import work.xuye.common.db.service.TaskManager; import work.xuye.common.db.service.TaskService; -import work.xuye.common.dto.HttpRequestParams; import work.xuye.common.dto.TaskVO; import work.xuye.common.enums.ProcessMode; import work.xuye.common.utils.ID; @@ -41,24 +41,28 @@ public class IssueService implements ApplicationRunner { public TaskVO issueTask(Integer id) { TaskVO taskVO = taskManager.getTaskInfoByTaskId(id); - HttpRequestParams requestParams = taskVO.getTask().getRequestParams(); - Message message = MessageBuilder - .withPayload(requestParams) + Task task = taskVO.getTask(); + TaskMiniVO taskMiniVO = TaskMiniVO.builder() + .id(task.getId()) + .name(task.getName()) + .build(); + Message message = MessageBuilder + .withPayload(taskMiniVO) .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SCHEDULER) .setHeader(MessageConstants.TASK_TRACE_ID, ID.generate()) - .setHeader(MessageConstants.TASK_ID, taskVO.getTask().getId()) - .setHeader(MessageConstants.TASK_NAME, taskVO.getTask().getName()) + .setHeader(MessageConstants.TASK_ID, task.getId()) + .setHeader(MessageConstants.TASK_NAME, task.getName()) .build(); boolean send = streamBridge.send(BindingConstants.TASK_OUT, message); if (!send) { throw new RuntimeException("send message failed"); } - ProcessMode processMode = taskVO.getTask().getProcessMode(); + ProcessMode processMode = task.getProcessMode(); String emoji = processMode.equals(ProcessMode.DEBUG) ? "\uD83E\uDD16" : ""; log.info("\uD83D\uDCE4 {} {}-{}", emoji, - taskVO.getTask().getId(), - taskVO.getTask().getName()); + task.getId(), + task.getName()); return taskVO; } diff --git a/scheduler/src/main/resources/bootstrap.yml b/scheduler/src/main/resources/bootstrap.yml index 14bf66b..2504965 100644 --- a/scheduler/src/main/resources/bootstrap.yml +++ b/scheduler/src/main/resources/bootstrap.yml @@ -5,9 +5,9 @@ spring: nacos: config: enabled: true - server-addr: https://nacos-test.deepq.tech + server-addr: https://nacos-sit.deepq.tech file-extension: yml - namespace: std-news-process-sit + namespace: std-news-process-dev shared-configs: - data-id: common.yml refresh: true diff --git a/sink/src/main/java/work/xuye/sink/controller/OpenController.java b/sink/src/main/java/work/xuye/sink/controller/OpenController.java index 0849279..191ebfd 100644 --- a/sink/src/main/java/work/xuye/sink/controller/OpenController.java +++ b/sink/src/main/java/work/xuye/sink/controller/OpenController.java @@ -1,19 +1,46 @@ package work.xuye.sink.controller; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; +import work.xuye.common.properties.CommonProperties; +import work.xuye.sink.yueke.*; + +import java.util.List; /** * @author xuye * @since 2023/7/4 15:16 **/ +@Slf4j +@Api(tags = "外部请求") @RestController -@RequestMapping("/open") +@RequestMapping("/open/api") +@RequiredArgsConstructor public class OpenController { - @GetMapping - public String open() { - return "hello ~"; + private final YueKeRevokeService yueKeRevokeService; + private final CommonProperties commonProperties; + + @PostMapping("/revoke") + @ApiOperation(value = "阅客资讯-下架") + public YueKeResponse revoke(@RequestBody YueKeRevokeNewsIDsVO ids, @RequestHeader(name = "metaData") YueKeRequestModel headerModel) { + log.info("body: {}, metaData: {}", ids, headerModel); + yueKeRevokeService.handleRevokeRequest(ids, headerModel); + return YueKeResponse.of(YueKeResEnum.COMMON_SUCCESS); + } + + @GetMapping("/debug/{id}") + @ApiOperation(value = "测试-阅客资讯-下架") + public YueKeResponse debug(@PathVariable String id) { + if (commonProperties.isProduction()) { + throw new UnsupportedOperationException("生产环境不支持该操作"); + } + yueKeRevokeService.revoke(List.of(id)); + return YueKeResponse.of(YueKeResEnum.COMMON_SUCCESS); } + + } diff --git a/sink/src/main/java/work/xuye/sink/handler/DisappearHandler.java b/sink/src/main/java/work/xuye/sink/handler/DisappearHandler.java index 6628e1b..3b13aa0 100644 --- a/sink/src/main/java/work/xuye/sink/handler/DisappearHandler.java +++ b/sink/src/main/java/work/xuye/sink/handler/DisappearHandler.java @@ -10,6 +10,7 @@ import work.xuye.common.constant.RegexConstants; import work.xuye.common.db.entity.Sql; import work.xuye.common.db.entity.Task; import work.xuye.common.db.entity.vo.SinkParams; +import work.xuye.common.db.entity.vo.TableTemplate; import work.xuye.common.db.service.SqlManager; import work.xuye.common.db.service.TaskManager; import work.xuye.common.dto.TaskVO; @@ -18,6 +19,7 @@ import work.xuye.common.enums.MessageType; import work.xuye.common.enums.Status; import work.xuye.common.properties.AlertProperties; import work.xuye.common.service.MessageService; +import work.xuye.common.spel.SpringExpressionLanguageEvaluator; import work.xuye.common.utils.DebugUtil; import work.xuye.sink.service.JdbcService; import work.xuye.sink.service.SqlExecutor; @@ -43,6 +45,7 @@ public class DisappearHandler { private final SqlManager sqlManager; private final AlertProperties alertProperties; private final SqlExecutor sqlExecutor; + private final SpringExpressionLanguageEvaluator evaluator; public void handle(Message> message) { @@ -62,11 +65,15 @@ public class DisappearHandler { return; } String condition = delete.getDisappearConfig().getCondition(); - String tableName = task.getSinkParams().getTableName(); - String dataSourceName = task.getSinkParams().getDataSourceName(); + + String templateNameExpression = taskVO.getTask().getSinkParams().getTemplateNameExpression(); + String templateName = evaluator.evaluate(message.getPayload(), templateNameExpression, String.class).toString(); + TableTemplate template = taskManager.getTemplateByName(templateName).getTableTemplate(); + String tableName = template.getTableName(); + String dataSourceName = template.getDatasourceName(); List> itemList = jdbcService.getForListMap(dataSourceName, SqlGenerator.generateGivenFieldSql(tableName, condition, taskName)); - String uniqueField = taskVO.getTemplate().getTableTemplate().getUniqueField().getFieldName(); + String uniqueField = template.getUniqueField().getFieldName(); List> removed = itemList.stream().filter(item -> !message.getPayload().contains(item.get(uniqueField).toString())).collect(Collectors.toList()); if (ObjectUtils.isEmpty(removed)) { return; diff --git a/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java b/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java index aadc774..d339a5d 100644 --- a/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java +++ b/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java @@ -79,10 +79,15 @@ public class ItemHandler { } //相关字段放这里 String md5Digest = DigestUtils.md5DigestAsHex(message.getPayload().getBytes()); - TableTemplate template = taskVO.getTemplate().getTableTemplate(); + + + String templateNameExpression = taskVO.getTask().getSinkParams().getTemplateNameExpression(); + String templateName = evaluator.evaluate(message.getPayload(), templateNameExpression, String.class).toString(); + TableTemplate template = taskManager.getTemplateByName(templateName).getTableTemplate(); + SinkParams.CheckDelete checkDeleteConfig = taskVO.getTask().getSinkParams().getCheckDelete(); // 根据消息体,和映射关系,使用SpEL表达式,得到计算结果 - HashMap expressionResultMap = evaluator.evaluate(taskVO.getMapping().getTableMapping(), message); + HashMap expressionResultMap = evaluator.evaluate(taskVO.getTemplateMappingMap().get(templateName).getTableMapping(), message); // 如果不能通过校验,则抛出异常 this.validate(expressionResultMap, template); // 将计算结果合并到模板中,此刻的模板是拥有值的 @@ -116,11 +121,11 @@ public class ItemHandler { this.putToCache(template, md5Digest); return; } - Map dbItem = this.getExistedItem(taskVO, template); + Map dbItem = this.getExistedItem(template); this.tryUpdate(taskVO, dbItem, expressionResultMap, md5Digest, template, reason); //如果不存在,就插入 } else { - this.tryInsert(taskVO.getTask(), expressionResultMap, template, reason); + this.tryInsert(message.getPayload(), taskVO, expressionResultMap, template, reason); } this.putToCache(template, md5Digest); } @@ -166,7 +171,7 @@ public class ItemHandler { private boolean tryDelete(TaskVO taskVO, TableTemplate template, HashMap newItem, SinkParams.CheckDelete checkDeleteConfig, ProcessReason reason) { // 如果不是 是固定值检测删除并且状态是开启,说明无需处理,直接返回 if ( - !(checkDeleteConfig.getStatus().equals(Status.on) && checkDeleteConfig.getMode().equals(DeleteCheckMode.fixedValue)) + !(Status.on.equals(checkDeleteConfig.getStatus()) && DeleteCheckMode.fixedValue.equals(checkDeleteConfig.getMode())) ) { return false; } @@ -175,15 +180,15 @@ public class ItemHandler { // 满足断言条件,才能继续处理 boolean satisfied = this.isSatisfied(taskVO.getTask(), newItem, template, predicates, "删除"); if (satisfied) { - return doDelete(taskVO, template.getUniqueField(), reason); + return doDelete(taskVO, template, reason); } return false; } - private boolean doDelete(TaskVO taskVO, TableTemplate.Field uniqueField, ProcessReason reason) { - String tableName = taskVO.getTask().getSinkParams().getTableName(); + public boolean doDelete(TaskVO taskVO, TableTemplate template, ProcessReason reason) { + TableTemplate.Field uniqueField = template.getUniqueField(); + String tableName = template.getTableName(); Object value = uniqueField.getValue(); - TableTemplate template = taskVO.getTemplate().getTableTemplate(); TableTemplate.LogicDeleteField logicDeleteField = template.getLogicDeleteField(); boolean isLogicDelete = !ObjectUtils.isEmpty(logicDeleteField); String deleteSql; @@ -192,7 +197,7 @@ public class ItemHandler { } else { deleteSql = SqlGenerator.generateLogicDeleteUpdateSql(tableName, template); } - int rows = jdbcService.executeSql(taskVO.getTask().getSinkParams().getDataSourceName(), deleteSql); + int rows = jdbcService.executeSql(template.getDatasourceName(), deleteSql); log.info("[{}][\uD83D\uDDD1️已{}删除{}条][{} {}] " + "{}: {}", taskVO.getTask().getName(), isLogicDelete ? "逻辑" : "", @@ -201,6 +206,9 @@ public class ItemHandler { reason.name(), uniqueField.getFieldName(), value); + if (ProcessReason.external_condition.equals(reason)) { + return rows > 0; + } if (rows != 1) { throw new RuntimeException("期望删除一条,实际删除" + rows + "条数据,请尽快处理"); } @@ -255,11 +263,6 @@ public class ItemHandler { field.setValue(value); } } - - // 以任务的表名为准,覆盖模板的表名 - if (!ObjectUtils.isEmpty(taskVO.getTask().getSinkParams().getTableName())) { - template.setTableName(taskVO.getTask().getSinkParams().getTableName()); - } } @@ -289,12 +292,12 @@ public class ItemHandler { private boolean itemExist(SinkParams sinkParams, TableTemplate tableTemplate) { String selectExist = SqlGenerator.generateExistedSql(tableTemplate); - return jdbcService.isExist(sinkParams.getDataSourceName(), selectExist); + return jdbcService.isExist(tableTemplate.getDatasourceName(), selectExist); } - private Map getExistedItem(TaskVO taskVO, TableTemplate template) { - String dataSourceName = taskVO.getTask().getSinkParams().getDataSourceName(); - return jdbcService.getOne(dataSourceName, SqlGenerator.generateFetchSql(template)); + private Map getExistedItem(TableTemplate template) { + String datasourceName = template.getDatasourceName(); + return jdbcService.getOne(datasourceName, SqlGenerator.generateFetchSql(template)); } @@ -303,7 +306,7 @@ public class ItemHandler { String taskName = taskVO.getTask().getName(); boolean updateConfigEnabled = sinkParams.getCheckUpdate().getStatus().equals(Status.on); if (updateConfigEnabled) { - String dataSourceName = sinkParams.getDataSourceName(); + String dataSourceName = tableTemplate.getDatasourceName(); Assert.notNull(dataSourceName, "dataSourceName is null"); SinkParams.CheckUpdate checkUpdateConfig = sinkParams.getCheckUpdate(); String fieldName = checkUpdateConfig.getFieldName(); @@ -349,9 +352,13 @@ public class ItemHandler { } - private void tryInsert(Task task, Map newItem, TableTemplate template, ProcessReason reason) { - if (this.isSatisfied(task, newItem, template, task.getSinkParams().getInsertConfig().getPredicates(), "保存")) { - jdbcService.update(task.getSinkParams().getDataSourceName(), SqlGenerator.generateInsertSql(template)); + private void tryInsert(String message, TaskVO taskVO, Map newItem, TableTemplate template, ProcessReason reason) { + Task task = taskVO.getTask(); + // 执行前置检查 + if (!this.isSatisfied(task, newItem, template, task.getSinkParams().getInsertConfig().getPredicates(), "保存")) { + return; + } else { + jdbcService.update(template.getDatasourceName(), SqlGenerator.generateInsertSql(template)); log.info("[{}][\uD83D\uDCBE 已保存][{} {}] " + "{}: {}", task.getName(), reason.getEmoji(), @@ -359,6 +366,15 @@ public class ItemHandler { template.getUniqueField().getFieldName(), template.getUniqueField().getValue()); } + + // 执行后置动作 + List postActions = task.getSinkParams().getInsertConfig().getPostActions(); + if (ObjectUtils.isEmpty(postActions)) { + return; + } + for (String expression : postActions) { + evaluator.evaluate(message, expression, Void.class); + } } } \ No newline at end of file diff --git a/sink/src/main/java/work/xuye/sink/yueke/YueKeConstants.java b/sink/src/main/java/work/xuye/sink/yueke/YueKeConstants.java new file mode 100644 index 0000000..f70529f --- /dev/null +++ b/sink/src/main/java/work/xuye/sink/yueke/YueKeConstants.java @@ -0,0 +1,14 @@ +package work.xuye.sink.yueke; + +/** + * @author xuye + * @since 2023/7/10 09:56 + **/ +public class YueKeConstants { + + //请求时间验证3分钟有效(毫秒) + public static final Long VALID_MS = 1000 * 60 * 3L; + public static final Integer REQ_ID_MIN_LENGTH = 10; + public static final Integer REQ_ID_MAX_LENGTH = 50; + +} \ No newline at end of file diff --git a/sink/src/main/java/work/xuye/sink/yueke/YueKeException.java b/sink/src/main/java/work/xuye/sink/yueke/YueKeException.java new file mode 100644 index 0000000..ac059af --- /dev/null +++ b/sink/src/main/java/work/xuye/sink/yueke/YueKeException.java @@ -0,0 +1,23 @@ +package work.xuye.sink.yueke; + +import lombok.Data; +import lombok.EqualsAndHashCode; + +/** + * @author xuye + * @since 2023/7/10 10:19 + **/ +@Data +@EqualsAndHashCode(callSuper = true) +public class YueKeException extends RuntimeException { + + private String requestId; + + private YueKeResEnum yueKeResEnum; + + + public YueKeException(String requestId, YueKeResEnum yueKeResEnum) { + this.requestId = requestId; + this.yueKeResEnum = yueKeResEnum; + } +} diff --git a/sink/src/main/java/work/xuye/sink/yueke/YueKeExceptionControllerAdvice.java b/sink/src/main/java/work/xuye/sink/yueke/YueKeExceptionControllerAdvice.java new file mode 100644 index 0000000..a108d06 --- /dev/null +++ b/sink/src/main/java/work/xuye/sink/yueke/YueKeExceptionControllerAdvice.java @@ -0,0 +1,20 @@ +package work.xuye.sink.yueke; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.RestControllerAdvice; + +/** + * @author xuye + * @since 2022/5/17 18:26 + **/ +@Slf4j +@RestControllerAdvice +public class YueKeExceptionControllerAdvice { + + @ExceptionHandler(YueKeException.class) + public YueKeResponse handleThrowable(YueKeException e) { + return YueKeResponse.of(e.getYueKeResEnum()); + } + +} diff --git a/sink/src/main/java/work/xuye/sink/yueke/YueKeRequestJsonConverter.java b/sink/src/main/java/work/xuye/sink/yueke/YueKeRequestJsonConverter.java new file mode 100644 index 0000000..74b57f7 --- /dev/null +++ b/sink/src/main/java/work/xuye/sink/yueke/YueKeRequestJsonConverter.java @@ -0,0 +1,23 @@ +package work.xuye.sink.yueke; + +import com.google.gson.Gson; +import lombok.RequiredArgsConstructor; +import org.jetbrains.annotations.NotNull; +import org.springframework.core.convert.converter.Converter; +import org.springframework.stereotype.Component; + +/** + * @author xuye + * @since 2023/7/10 18:06 + **/ +@Component +@RequiredArgsConstructor +public class YueKeRequestJsonConverter implements Converter { + + private final Gson gson; + + @Override + public YueKeRequestModel convert(@NotNull String jsonSource) { + return gson.fromJson(jsonSource, YueKeRequestModel.class); + } +} diff --git a/sink/src/main/java/work/xuye/sink/yueke/YueKeRequestModel.java b/sink/src/main/java/work/xuye/sink/yueke/YueKeRequestModel.java new file mode 100644 index 0000000..a99de48 --- /dev/null +++ b/sink/src/main/java/work/xuye/sink/yueke/YueKeRequestModel.java @@ -0,0 +1,18 @@ +package work.xuye.sink.yueke; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * @author xuye + * @since 2023/7/10 17:57 + **/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class YueKeRequestModel { + private String reqId; + private Long timestamp; + private String sign; +} diff --git a/sink/src/main/java/work/xuye/sink/yueke/YueKeResEnum.java b/sink/src/main/java/work/xuye/sink/yueke/YueKeResEnum.java new file mode 100644 index 0000000..39a0ab1 --- /dev/null +++ b/sink/src/main/java/work/xuye/sink/yueke/YueKeResEnum.java @@ -0,0 +1,54 @@ +package work.xuye.sink.yueke; + +import java.io.Serializable; + +/** + * @author xuye + * @since 2023/7/10 09:57 + **/ +public enum YueKeResEnum implements Serializable { + + //一般成功, + COMMON_SUCCESS("10000", "success"), + //一般失败 + COMMON_ERROR("50000", "error"), + + //11000开头的是权限验证用 + SECURITY_ERROR_11001("11001", "metadata is empty"), + SECURITY_ERROR_11002("11002", "some params in metadata is empty"), + SECURITY_ERROR_11003("11003", "timestamp valid time is " + YueKeConstants.VALID_MS / 1000 / 60 + " minutes"), + + SECURITY_ERROR_11006("11006", "sign is not valid"), + SECURITY_ERROR_11007("11007", "reqId is too short, min length is " + YueKeConstants.REQ_ID_MIN_LENGTH), + SECURITY_ERROR_11008("11008", "reqId is too long, max length is " + YueKeConstants.REQ_ID_MAX_LENGTH), + + SECURITY_ERROR_11009("11009", "refer is empty"), + SECURITY_ERROR_11010("11010", "refer is not valid"), + + SECURITY_ERROR_11501("11501", "other error"); + + + private final String code; + private final String msg; + + YueKeResEnum(String code, String msg) { + this.code = code; + this.msg = msg; + } + + public String getCode() { + return code; + } + + public String getMsg() { + return msg; + } + + @Override + public String toString() { + return "APIResponse{" + + "code='" + code + '\'' + + ", msg='" + msg + '\'' + + '}'; + } +} \ No newline at end of file diff --git a/sink/src/main/java/work/xuye/sink/yueke/YueKeResponse.java b/sink/src/main/java/work/xuye/sink/yueke/YueKeResponse.java new file mode 100644 index 0000000..0d086be --- /dev/null +++ b/sink/src/main/java/work/xuye/sink/yueke/YueKeResponse.java @@ -0,0 +1,33 @@ +package work.xuye.sink.yueke; + +import lombok.Data; + +import java.util.UUID; + +/** + * @author xuye + * @since 2023/7/10 09:59 + **/ +@Data +public class YueKeResponse { + + private String requestId; + private Long requestStartTime; + private Long requestEndTime; + private String code; + private String msg; + + public YueKeResponse() { + this.requestId = UUID.randomUUID().toString().replaceAll("-", ""); + this.requestStartTime = System.currentTimeMillis(); + } + + public static YueKeResponse of(YueKeResEnum yueKeResEnum) { + YueKeResponse response = new YueKeResponse(); + response.setCode(yueKeResEnum.getCode()); + response.setMsg(yueKeResEnum.getMsg()); + response.setRequestEndTime(System.currentTimeMillis()); + return response; + } + +} \ No newline at end of file diff --git a/sink/src/main/java/work/xuye/sink/yueke/YueKeRevokeNewsIDsVO.java b/sink/src/main/java/work/xuye/sink/yueke/YueKeRevokeNewsIDsVO.java new file mode 100644 index 0000000..7ecb4b2 --- /dev/null +++ b/sink/src/main/java/work/xuye/sink/yueke/YueKeRevokeNewsIDsVO.java @@ -0,0 +1,25 @@ +package work.xuye.sink.yueke; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author xuye + * @since 2023/7/10 17:50 + **/ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class YueKeRevokeNewsIDsVO { + + private Data data = new Data(); + + @lombok.Data + public static class Data { + private final List messageIdList = new ArrayList<>(); + } +} diff --git a/sink/src/main/java/work/xuye/sink/yueke/YueKeRevokeService.java b/sink/src/main/java/work/xuye/sink/yueke/YueKeRevokeService.java new file mode 100644 index 0000000..b58f86a --- /dev/null +++ b/sink/src/main/java/work/xuye/sink/yueke/YueKeRevokeService.java @@ -0,0 +1,147 @@ +package work.xuye.sink.yueke; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.codec.digest.DigestUtils; +import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; +import work.xuye.common.db.entity.Mapping; +import work.xuye.common.db.entity.Task; +import work.xuye.common.db.entity.Template; +import work.xuye.common.db.entity.vo.TableTemplate; +import work.xuye.common.db.service.TaskManager; +import work.xuye.common.dto.TaskVO; +import work.xuye.common.enums.ProcessMode; +import work.xuye.common.enums.ProcessReason; +import work.xuye.common.properties.YueKeProperties; +import work.xuye.common.store.NsKVMapStore; +import work.xuye.sink.handler.ItemHandler; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +/** + * @author xuye + * @since 2023/7/10 10:15 + **/ +@Slf4j +@Service +@RequiredArgsConstructor +public class YueKeRevokeService { + + private final YueKeProperties yueKeProperties; + private final TaskManager taskManager; + private final NsKVMapStore mapStore; + private final ItemHandler itemHandler; + + + private static String getSign(Long timestamp, String reqId, String secret) { + String params = timestamp + reqId + secret; + return DigestUtils.md5Hex(params); + } + + public void handleRevokeRequest(YueKeRevokeNewsIDsVO ids, YueKeRequestModel requestModel) { + // 根据沟通,阅客对同一篇资讯会下发两次id,一次是普通的id,一次是带有:的id,这里需要过滤掉带:的id + if (this.afterClean(ids).isEmpty()) { + log.info("revoke ids is empty, ignore"); + return; + } + log.info("待下架的资讯id列表: {}", ids); + // 确保是阅客的请求 + this.checkRequest(requestModel); + log.info("请求合法,开始下架"); + // 执行下架 + this.revoke(ids.getData().getMessageIdList()); + } + + public void revoke(List idList) { + // 前提条件:阅客的任务名包含 yueke,并且只有一条记录 + Task task = taskManager.getTaskService().lambdaQuery() + .eq(Task::getName, yueKeProperties.getTaskName()) + .eq(Task::getProcessMode, ProcessMode.NORMAL) + .select(Task::getId) + .orderBy(true, true, Task::getCreateTime) + .last("limit 1") + .one(); + if (ObjectUtils.isEmpty(task)) { + log.warn("接收到阅客的下架请求,但是库中没有此任务,跳过处理"); + return; + } + TaskVO taskVO = taskManager.getTaskInfoByTaskId(task.getId()); + + for (String id : idList) { + String url = mapStore.getValue(yueKeProperties.getNamespaceOfIdUrlMap(), id); + if (ObjectUtils.isEmpty(url)) { + log.warn("接收到阅客的下架请求,但是库中没有此记录,跳过处理"); + continue; + } + List