diff --git a/common/src/main/java/tech/deepq/common/enums/SignatureEnumGroup.java b/common/src/main/java/tech/deepq/common/enums/SignatureEnumGroup.java index 2aeaddb..5f56a0b 100644 --- a/common/src/main/java/tech/deepq/common/enums/SignatureEnumGroup.java +++ b/common/src/main/java/tech/deepq/common/enums/SignatureEnumGroup.java @@ -10,14 +10,16 @@ public class SignatureEnumGroup { * 加密字符串的位置 */ public static enum SignatureLocation{ - header + header, + url } public static enum KeyAlgorithm{ /** * RSA */ - RSA + RSA, + MD5 } public static enum KeyType{ diff --git a/common/src/main/java/tech/deepq/common/utils/signature/AbstractSignatureService.java b/common/src/main/java/tech/deepq/common/utils/signature/AbstractSignatureService.java index 789ccce..9b00090 100644 --- a/common/src/main/java/tech/deepq/common/utils/signature/AbstractSignatureService.java +++ b/common/src/main/java/tech/deepq/common/utils/signature/AbstractSignatureService.java @@ -15,13 +15,13 @@ public abstract class AbstractSignatureService { protected SignatureContext signatureContext; /** - * 使用私钥加密 + * 加密 * * @param content * @return * @throws GeneralSecurityException */ - public abstract String signatureByPriKey(String content) throws GeneralSecurityException; + public abstract String signature(String content) throws GeneralSecurityException; void init(SignatureConfig signatureConfig) throws GeneralSecurityException { diff --git a/common/src/main/java/tech/deepq/common/utils/signature/Md5SignatureService.java b/common/src/main/java/tech/deepq/common/utils/signature/Md5SignatureService.java new file mode 100644 index 0000000..b3ccc5e --- /dev/null +++ b/common/src/main/java/tech/deepq/common/utils/signature/Md5SignatureService.java @@ -0,0 +1,33 @@ +package tech.deepq.common.utils.signature; + +import org.springframework.util.DigestUtils; + +import java.nio.charset.StandardCharsets; +import java.security.GeneralSecurityException; + +/** + * @author yechuan + * @since 2023/9/7 10:19 + **/ +class Md5SignatureService extends AbstractSignatureService { + + private Md5SignatureService() { + } + + + public static AbstractSignatureService getInstance() throws GeneralSecurityException { + return new Md5SignatureService(); + } + + /** + * md5加密 + * + * @param content + * @return + * @throws GeneralSecurityException + */ + @Override + public String signature(String content) throws GeneralSecurityException { + return DigestUtils.md5DigestAsHex(content.getBytes(StandardCharsets.UTF_8)).toUpperCase(); + } +} diff --git a/common/src/main/java/tech/deepq/common/utils/signature/RSASignatureService.java b/common/src/main/java/tech/deepq/common/utils/signature/RSASignatureService.java index 34791a3..268a5b8 100644 --- a/common/src/main/java/tech/deepq/common/utils/signature/RSASignatureService.java +++ b/common/src/main/java/tech/deepq/common/utils/signature/RSASignatureService.java @@ -32,7 +32,7 @@ class RSASignatureService extends AbstractSignatureService { } @Override - public String signatureByPriKey(String content) throws GeneralSecurityException { + public String signature(String content) throws GeneralSecurityException { if (signatureContext == null) { throw new RuntimeException("未初始化公私钥对信息"); } diff --git a/common/src/main/java/tech/deepq/common/utils/signature/SignatureFactory.java b/common/src/main/java/tech/deepq/common/utils/signature/SignatureFactory.java index 8958469..bcaeee3 100644 --- a/common/src/main/java/tech/deepq/common/utils/signature/SignatureFactory.java +++ b/common/src/main/java/tech/deepq/common/utils/signature/SignatureFactory.java @@ -22,6 +22,9 @@ public class SignatureFactory { case RSA: result = RSASignatureService.getInstance(signatureConfig); break; + case MD5: + result = Md5SignatureService.getInstance(); + break; } if (result == null) { throw new RuntimeException(String.format("不支持的签名方式,signatureConfig:%s", signatureConfig)); diff --git a/source/src/main/java/tech/deepq/source/handler/SourceHandler.java b/source/src/main/java/tech/deepq/source/handler/SourceHandler.java index 5421a62..610c78c 100644 --- a/source/src/main/java/tech/deepq/source/handler/SourceHandler.java +++ b/source/src/main/java/tech/deepq/source/handler/SourceHandler.java @@ -13,6 +13,7 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import org.springframework.util.DigestUtils; import org.springframework.util.ObjectUtils; +import org.springframework.util.StringUtils; import tech.deepq.common.constant.*; import tech.deepq.common.db.entity.Task; import tech.deepq.common.db.entity.vo.Dispatcher; @@ -32,6 +33,7 @@ import tech.deepq.common.utils.signature.AbstractSignatureService; import tech.deepq.common.utils.signature.SignatureFactory; import tech.deepq.source.request.RequestClient; import tech.deepq.source.spel.CustomRequest; +import tech.deepq.source.spel.ZhiDeMaiRequest; import java.nio.charset.StandardCharsets; import java.security.GeneralSecurityException; @@ -55,6 +57,7 @@ public class SourceHandler { private final StreamBridge streamBridge; private final TaskManager taskManager; public final Gson gson; + public final ZhiDeMaiRequest zdm; public List> handle(Message message) { TaskMiniVO taskMiniVO = message.getPayload(); @@ -164,9 +167,11 @@ public class SourceHandler { /** * 根据当前任务返回值与当前任务的分支配置信息构建将要发送的消息 + * * @return {@code Map} */ private Map> buildNextNodeMessage(List result, Task task, Message message) { + Dispatcher filter = task.getDispatcher(); Map> resultMap = new HashMap<>(); @@ -204,15 +209,17 @@ public class SourceHandler { httpRes.setTemporary(null); Map map = (Map) temporary; // 是否更新判断 - String uniqueKey = map.get(filter.getUniqueKey()).toString(); - uniqueKey = httpRes.getRequestParams().getUrl() + "?uniqueKey=" + uniqueKey; - String md5Digest = DigestUtils.md5DigestAsHex(gson.toJson(map).getBytes(StandardCharsets.UTF_8)); - ResourceStatus resourceStatus = urlMD5Service.getResourceStatus(uniqueKey, md5Digest); - urlMD5Service.put(uniqueKey, md5Digest); - if (ResourceStatus.UNCHANGED.equals(resourceStatus)) { - return; + String uniqueKey = ""; + if (filter.getUniqueKey() != null) { + uniqueKey = map.get(filter.getUniqueKey()).toString(); + uniqueKey = httpRes.getRequestParams().getUrl() + "?uniqueKey=" + uniqueKey; + String md5Digest = DigestUtils.md5DigestAsHex(gson.toJson(map).getBytes(StandardCharsets.UTF_8)); + ResourceStatus resourceStatus = urlMD5Service.getResourceStatus(uniqueKey, md5Digest); + urlMD5Service.put(uniqueKey, md5Digest); + if (ResourceStatus.UNCHANGED.equals(resourceStatus)) { + return; + } } - // 节点命中 List nextNodes = filter.getNextNodes(); if (ObjectUtils.isEmpty(nextNodes)) { @@ -228,7 +235,9 @@ public class SourceHandler { continue; } Set otherCacheKeySet = new HashSet<>(); - otherCacheKeySet.add(uniqueKey); + if (StringUtils.hasText(uniqueKey)) { + otherCacheKeySet.add(uniqueKey); + } // 命中 String topic = getPlaceholderSpel(nextNode.getTopic(), filter.getPlaceholderExpressions(), context, String.class); Object payload = getPlaceholderSpel(nextNode.getPayload(), filter.getPlaceholderExpressions(), context, Object.class); @@ -248,6 +257,8 @@ public class SourceHandler { resultMap.get(topic).add(nextTaskMessage); if ("break".equals(filter.getMode())) { break; + } else if ("continue".equals(filter.getMode())) { + continue; } } } @@ -343,10 +354,19 @@ public class SourceHandler { AbstractSignatureService signatureService = null; try { signatureService = SignatureFactory.getSignatureService(signatureConfig); - String signature = signatureService.signatureByPriKey(signatureContext); + String signature = signatureService.signature(signatureContext); // 优化 if (SignatureEnumGroup.SignatureLocation.header.equals(signatureConfig.getSignatureLocation())) { requestParams.getHeaders().put(signatureConfig.getSignatureField(), signature); + } else if (SignatureEnumGroup.SignatureLocation.url.equals(signatureConfig.getSignatureLocation())) { + String url = requestParams.getUrl(); + String separator = ""; + if (url.contains("?")) { + separator = "&"; + } else { + separator = "?"; + } + requestParams.setUrl(url + separator + signatureConfig.getSignatureField() + "=" + signature); } } catch (GeneralSecurityException e) { throw new RuntimeException(e); diff --git a/source/src/main/java/tech/deepq/source/spel/ZhiDeMaiRequest.java b/source/src/main/java/tech/deepq/source/spel/ZhiDeMaiRequest.java new file mode 100644 index 0000000..9d26bb8 --- /dev/null +++ b/source/src/main/java/tech/deepq/source/spel/ZhiDeMaiRequest.java @@ -0,0 +1,181 @@ +package tech.deepq.source.spel; + +import cn.hutool.core.collection.CollUtil; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import tech.deepq.common.db.entity.Store; +import tech.deepq.common.db.entity.vo.TaskMiniVO; +import tech.deepq.common.db.service.StoreManager; +import tech.deepq.common.dto.HttpRes; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.StringJoiner; + +/** + * @author yechuan + * @since 2023/9/6 22:57 + **/ +@Slf4j +@Component +public class ZhiDeMaiRequest { + + DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + @Autowired + private StoreManager storeManager; + + private static boolean init = false; + + public long getLatestEventId(Long initEventId) { + + if (!init && initEventId != null) { + init = true; + return initEventId; + } + + Store store = storeManager.getValueFromNsByKey("zdm", "latestEventId"); + if (store == null || store.getDeleted().compareTo(1) == 0) { + return 0L; + } + return Long.parseLong(store.getValue()); + } + + + /** + * 判断是否满足下一页event条件 + */ + public boolean hasNextEvent(HttpRes eventRes, JsonObject filterArgs) { + + return CollUtil.isNotEmpty(filterArgs.get("data").getAsJsonArray()); + } + + /** + * 构建下一页event消息内容 + */ + public TaskMiniVO buildNextEventPayload(HttpRes eventRes, JsonObject filterArgs, TaskMiniVO taskMiniVO) { + long maxEventId = Long.parseLong(eventRes.getRequestParams().getPlaceholderExpressionsValueCache().get("#{event_id}").toString()); + + + for (JsonElement eventObject : filterArgs.get("data").getAsJsonArray()) { + JsonObject event = eventObject.getAsJsonObject(); + long eventId = event.get("event_id").getAsLong(); + maxEventId = Math.max(eventId, maxEventId); + } + if (taskMiniVO.getArgs() == null) { + taskMiniVO.setArgs(new HashMap<>()); + } + taskMiniVO.getArgs() + .put("event_id", maxEventId + ""); + + storeManager.upsert("zdm", "latestEventId", maxEventId + ""); + return taskMiniVO; + } + + /** + * 判断是否满足资讯下架事件 + */ + public boolean hasDelArticle(HttpRes eventRes, JsonObject filterArgs) { + LocalDate today = LocalDate.now(); + + + for (JsonElement eventObject : filterArgs.get("data").getAsJsonArray()) { + JsonObject event = eventObject.getAsJsonObject(); + String creationDate = event.get("creation_date").getAsString(); + if (!today.isEqual(LocalDateTime.parse(creationDate, timeFormatter).toLocalDate())) { + continue; + } + + String action = event.get("action").getAsString(); + if ("del".equals(action)) { + return true; + } + } + + return false; + } + + /** + * 构建资讯下架消息内容 + */ + public String buildDelArticlePayload(HttpRes eventRes, JsonObject filterArgs, TaskMiniVO taskMiniVO) { + LocalDate today = LocalDate.now(); + + JsonObject result = new JsonObject(); + JsonArray delDelArticleArray = new JsonArray(); + result.add("data", delDelArticleArray); + for (JsonElement eventObject : filterArgs.get("data").getAsJsonArray()) { + JsonObject event = eventObject.getAsJsonObject(); + String creationDate = event.get("creation_date").getAsString(); + if (!today.isEqual(LocalDateTime.parse(creationDate, timeFormatter).toLocalDate())) { + continue; + } + + String action = event.get("action").getAsString(); + if ("del".equals(action)) { + JsonObject article = new JsonObject(); + article.add("article_id", event.get("main_id")); + article.add("update_time", event.get("creation_date")); + article.add("action", event.get("action")); + delDelArticleArray.add(article); + } + } + return result.getAsString(); + } + + + /** + * 判断是否满足资讯更新事件 + */ + public boolean hasUpdateOrInsertArticle(HttpRes eventRes, JsonObject filterArgs) { + LocalDate today = LocalDate.now(); + + for (JsonElement eventObject : filterArgs.get("data").getAsJsonArray()) { + JsonObject event = eventObject.getAsJsonObject(); + String creationDate = event.get("creation_date").getAsString(); + if (!today.isEqual(LocalDateTime.parse(creationDate, timeFormatter).toLocalDate())) { + continue; + } + + String action = event.get("action").getAsString(); + if ("upd".equals(action) || "pub".equals(action)) { + return true; + } + } + return false; + } + + /** + * 构建资讯更新消息内容 + */ + public TaskMiniVO buildUpdateOrInsertArticlePayload(HttpRes eventRes, JsonObject filterArgs, TaskMiniVO taskMiniVO) { + LocalDate today = LocalDate.now(); + if (taskMiniVO.getArgs() == null) { + taskMiniVO.setArgs(new HashMap<>()); + } + + StringJoiner stringJoiner = new StringJoiner(","); + + for (JsonElement eventObject : filterArgs.get("data").getAsJsonArray()) { + JsonObject event = eventObject.getAsJsonObject(); + String creationDate = event.get("creation_date").getAsString(); + if (!today.isEqual(LocalDateTime.parse(creationDate, timeFormatter).toLocalDate())) { + continue; + } + + String action = event.get("action").getAsString(); + if ("upd".equals(action) || "pub".equals(action)) { + stringJoiner.add(event.get("main_id").getAsString()); + } + } + taskMiniVO.getArgs() + .put("article_ids", stringJoiner.toString()); + return taskMiniVO; + } +}