diff --git a/common/pom.xml b/common/pom.xml index 7b1112e..8ad961f 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 work.xuye @@ -35,7 +36,7 @@ mysql mysql-connector-java - 8.0.31 + 8.0.33 @@ -115,7 +116,11 @@ runtime true - + + tech.deepq + sq-sentry + 1.1.8 + diff --git a/common/src/main/java/work/xuye/common/alert/ExceptionAspect.java b/common/src/main/java/work/xuye/common/alert/ExceptionAspect.java index 05e2a19..9f81df3 100644 --- a/common/src/main/java/work/xuye/common/alert/ExceptionAspect.java +++ b/common/src/main/java/work/xuye/common/alert/ExceptionAspect.java @@ -1,5 +1,6 @@ package work.xuye.common.alert; +import io.sentry.Sentry; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.annotation.AfterThrowing; @@ -24,6 +25,9 @@ public class ExceptionAspect { @AfterThrowing(pointcut = "execution(* work.xuye..*(..))", throwing = "ex") public void handleGlobalException(Exception ex) { + + Sentry.capture(ex); + List stackTraceElements = new java.util.ArrayList<>(List.of(ex.getStackTrace())); stackTraceElements.removeIf(stackTraceElement -> !stackTraceElement.getClassName().contains("work.xuye")); StringBuilder traceString = new StringBuilder(); diff --git a/common/src/main/java/work/xuye/common/constant/BindingConstants.java b/common/src/main/java/work/xuye/common/constant/BindingConstants.java index 768e21f..63d8fb6 100644 --- a/common/src/main/java/work/xuye/common/constant/BindingConstants.java +++ b/common/src/main/java/work/xuye/common/constant/BindingConstants.java @@ -8,7 +8,9 @@ public class BindingConstants { public static final String TASK_OUT = "task-out"; - public static final String TRANSFORMER_KEYS_OUT = "item-keys-out"; + public static final String SNAPSHOT_OUT_POSTFIX = "-snapshot-out"; + + public static final String TRANSFORMER_KEYS_OUT = "transformKeys-out"; } diff --git a/common/src/main/java/work/xuye/common/constant/SnapshotConstants.java b/common/src/main/java/work/xuye/common/constant/SnapshotConstants.java new file mode 100644 index 0000000..21f4952 --- /dev/null +++ b/common/src/main/java/work/xuye/common/constant/SnapshotConstants.java @@ -0,0 +1,16 @@ +package work.xuye.common.constant; + +/** + * @author xuye + * @since 2023/5/17 13:23 + **/ +public class SnapshotConstants { + + + public static final String RESOURCE_STATUS = "resourceStatus"; + public static final String STATUS = "status"; + + public static final String URL = "url"; + + +} diff --git a/common/src/main/java/work/xuye/common/constant/StageConstants.java b/common/src/main/java/work/xuye/common/constant/StageConstants.java index bacc7fc..1a29ea5 100644 --- a/common/src/main/java/work/xuye/common/constant/StageConstants.java +++ b/common/src/main/java/work/xuye/common/constant/StageConstants.java @@ -8,6 +8,7 @@ public class StageConstants { public static final String SCHEDULER = "scheduler"; public static final String SOURCE = "source"; + public static final String SOURCE_WATCH = "source-watch"; public static final String TRANSFORMER = "transformer"; public static final String END = "end"; diff --git a/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java b/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java index 2661b57..e5a8acf 100644 --- a/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java +++ b/common/src/main/java/work/xuye/common/dto/HttpRequestParams.java @@ -4,6 +4,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; import java.util.Map; @@ -27,5 +28,7 @@ public class HttpRequestParams { private Map headers = Map.of(); private Map placeholderExpressions = Map.of(); + + private String mediaType = MediaType.APPLICATION_JSON_VALUE; } diff --git a/common/src/main/java/work/xuye/common/spel/CustomFunction.java b/common/src/main/java/work/xuye/common/spel/CustomFunction.java index d210303..3e9a7d1 100644 --- a/common/src/main/java/work/xuye/common/spel/CustomFunction.java +++ b/common/src/main/java/work/xuye/common/spel/CustomFunction.java @@ -16,6 +16,9 @@ import work.xuye.common.constant.RawDataFiledKey; import work.xuye.common.store.NsKVMapStore; import work.xuye.common.utils.HttpUtil; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.Arrays; @@ -137,6 +140,17 @@ public class CustomFunction { return (String) exp.getValue(); } + + public String warpExclude(String json, String excludeFiled, String... fields) { + JsonObject obj = JsonParser.parseString(json).getAsJsonObject(); + if (obj.has(excludeFiled)) { + obj.remove(excludeFiled); + } else { + log.warn("json warpV2 excludeFiled not exist, excludeFiled: [{}]", excludeFiled); + } + return this.wrap(obj.toString(), fields); + } + /** * 用itemData包一层的原因:避免下游程序直接读到了有特殊意义的字段,比如type(财联社资讯类型 -1:快讯,0:要闻,1:非财联社资讯) * ... @@ -257,4 +271,12 @@ public class CustomFunction { } + public String epochMilliToLocalDatetime(String json, String path) { + String epochMilli = this.jsonExtract(json, path); + String localDatetime = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(epochMilli)), ZoneId.of("Asia/Shanghai")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + log.debug("epochMilliToLocalDatetime handler, json: [{}], path: [{}], epochMilli: [{}], localDatetime: [{}]", json, path, epochMilli, localDatetime); + return localDatetime; + } + + } diff --git a/common/src/main/java/work/xuye/common/utils/DebugUtil.java b/common/src/main/java/work/xuye/common/utils/DebugUtil.java index b4de59e..7e248c7 100644 --- a/common/src/main/java/work/xuye/common/utils/DebugUtil.java +++ b/common/src/main/java/work/xuye/common/utils/DebugUtil.java @@ -12,12 +12,16 @@ import java.util.Set; @Slf4j public class DebugUtil { + private static final boolean DEBUG = false; - private static final Set skipTaskIds = Set.of(); + private static final Set taskIds = Set.of(5); public static boolean isSkip(TaskVO task) { + if (!DEBUG) { + return false; + } Integer taskId = task.getTask().getId(); - if (skipTaskIds.contains(taskId)) { + if (!taskIds.contains(taskId)) { log.warn("skip task, [{}-{}]", taskId, task.getTask().getName()); return true; } diff --git a/common/src/main/java/work/xuye/common/utils/SpEL.java b/common/src/main/java/work/xuye/common/utils/SpEL.java index 8331bd1..81eeabd 100644 --- a/common/src/main/java/work/xuye/common/utils/SpEL.java +++ b/common/src/main/java/work/xuye/common/utils/SpEL.java @@ -11,7 +11,7 @@ import org.springframework.expression.spel.standard.SpelExpressionParser; public class SpEL { public static void main(String[] args) { ExpressionParser parser = new SpelExpressionParser(); - Expression exp = parser.parseExpression("T(java.time.LocalDateTime).now().minusHours(1).atZone(T(java.time.ZoneId).systemDefault()).toInstant().atOffset(T(java.time.ZoneOffset).UTC).toInstant().getEpochSecond()"); + Expression exp = parser.parseExpression("T(java.lang.Math).random() < 0.5 ? 1 : 2"); Object o = exp.getValue(); System.out.println(o); } diff --git a/common/src/main/resources/markdown/sql.md b/common/src/main/resources/markdown/sql.md index e9ca7a5..365ba75 100644 --- a/common/src/main/resources/markdown/sql.md +++ b/common/src/main/resources/markdown/sql.md @@ -68,3 +68,37 @@ where JSON_EXTRACT(v.raw_data, '$.crawler') = 'snp' order by v.`created_time` desc limit 50 ``` + +## 根据spider查询视频处理状态 + +```sql +select v.status, + v.publish_time, + v.if_update, + t.status, + v.title, + v.url, + v.`video_url`, + v.spider, + v.`created_time`, + v.`raw_data`, + t.`status`, + t.`retry_count`, + c.* +from `all_news_video_dycj` as v + left join `task_news` as t on v.`url` = t.`url` + left join `task_news_content` as c on t.`id` = c.id +where v.spider = 'bosera_videos_api' +order by v.`created_time` desc +limit 100 +``` + +## 根据spider清表重爬 + +```sql +delete v.*,c.*,t.* +from `all_news_video_dycj` as v + inner join `task_news` as t on v.`url` = t.`url` + inner join `task_news_content` as c on t.`id` = c.id +where v.spider = 'bosera_videos_api'; +``` \ No newline at end of file diff --git a/scheduler/src/main/java/work/xuye/scheduler/SchedulerApplication.java b/scheduler/src/main/java/work/xuye/scheduler/SchedulerApplication.java index f2e3831..bb31679 100644 --- a/scheduler/src/main/java/work/xuye/scheduler/SchedulerApplication.java +++ b/scheduler/src/main/java/work/xuye/scheduler/SchedulerApplication.java @@ -6,7 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @EnableScheduling -@SpringBootApplication(scanBasePackages = "work.xuye.*") +@SpringBootApplication(scanBasePackages = {"work.xuye.*", "tech.deepq.components"}) @MapperScan(basePackages = "work.xuye.common.db.mapper") public class SchedulerApplication { diff --git a/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java b/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java index c968e23..cd7be70 100644 --- a/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java +++ b/scheduler/src/main/java/work/xuye/scheduler/service/IssueService.java @@ -49,7 +49,10 @@ public class IssueService implements ApplicationRunner { .setHeader(MessageConstants.TASK_ID, taskVO.getTask().getId()) .setHeader(MessageConstants.TASK_NAME, taskVO.getTask().getName()) .build(); - streamBridge.send(BindingConstants.TASK_OUT, message); + boolean send = streamBridge.send(BindingConstants.TASK_OUT, message); + if (!send) { + throw new RuntimeException("send message failed"); + } ProcessMode processMode = taskVO.getTask().getProcessMode(); String emoji = processMode.equals(ProcessMode.DEBUG) ? "\uD83E\uDD16" : ""; log.info("\uD83D\uDCE4 {} {}-{}", diff --git a/sink/src/main/java/work/xuye/sink/SinkApplication.java b/sink/src/main/java/work/xuye/sink/SinkApplication.java index 61676bc..3e73fee 100644 --- a/sink/src/main/java/work/xuye/sink/SinkApplication.java +++ b/sink/src/main/java/work/xuye/sink/SinkApplication.java @@ -6,7 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @EnableScheduling -@SpringBootApplication(scanBasePackages = "work.xuye.*") +@SpringBootApplication(scanBasePackages = {"work.xuye.*", "tech.deepq.components"}) @MapperScan(basePackages = "work.xuye.common.db.mapper") public class SinkApplication { diff --git a/sink/src/main/java/work/xuye/sink/controller/OpenController.java b/sink/src/main/java/work/xuye/sink/controller/OpenController.java new file mode 100644 index 0000000..0849279 --- /dev/null +++ b/sink/src/main/java/work/xuye/sink/controller/OpenController.java @@ -0,0 +1,19 @@ +package work.xuye.sink.controller; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author xuye + * @since 2023/7/4 15:16 + **/ +@RestController +@RequestMapping("/open") +public class OpenController { + + @GetMapping + public String open() { + return "hello ~"; + } +} diff --git a/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java b/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java index 4b24748..aadc774 100644 --- a/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java +++ b/sink/src/main/java/work/xuye/sink/handler/ItemHandler.java @@ -165,7 +165,9 @@ public class ItemHandler { private boolean tryDelete(TaskVO taskVO, TableTemplate template, HashMap newItem, SinkParams.CheckDelete checkDeleteConfig, ProcessReason reason) { // 如果不是 是固定值检测删除并且状态是开启,说明无需处理,直接返回 - if (!(checkDeleteConfig.getMode().equals(DeleteCheckMode.fixedValue) && checkDeleteConfig.getStatus().equals(Status.on))) { + if ( + !(checkDeleteConfig.getStatus().equals(Status.on) && checkDeleteConfig.getMode().equals(DeleteCheckMode.fixedValue)) + ) { return false; } SinkParams.FixedValueConfig fixedValueConfig = checkDeleteConfig.getFixedValueConfig(); @@ -307,7 +309,8 @@ public class ItemHandler { String fieldName = checkUpdateConfig.getFieldName(); Object dbValue = dbItem.get(fieldName); if (ObjectUtils.isEmpty(dbValue)) { - throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是数据库不存在该字段"); + log.warn("根据{}来判断数据是否需要更新,但是数据库不存在该字段", fieldName); + return; } Object nowValue = resultMap.get(fieldName); if (!ObjectUtils.isEmpty(checkUpdateConfig.getJsonPath())) { diff --git a/source/src/main/java/work/xuye/source/SourceApplication.java b/source/src/main/java/work/xuye/source/SourceApplication.java index ffcf856..c41d0e1 100644 --- a/source/src/main/java/work/xuye/source/SourceApplication.java +++ b/source/src/main/java/work/xuye/source/SourceApplication.java @@ -4,7 +4,7 @@ import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -@SpringBootApplication(scanBasePackages = "work.xuye.*") +@SpringBootApplication(scanBasePackages = {"work.xuye.*", "tech.deepq.components"}) @MapperScan(basePackages = "work.xuye.common.db.mapper") public class SourceApplication { diff --git a/source/src/main/java/work/xuye/source/handler/SourceHandler.java b/source/src/main/java/work/xuye/source/handler/SourceHandler.java index 4fbed81..bb12d41 100644 --- a/source/src/main/java/work/xuye/source/handler/SourceHandler.java +++ b/source/src/main/java/work/xuye/source/handler/SourceHandler.java @@ -2,6 +2,7 @@ package work.xuye.source.handler; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; @@ -9,9 +10,7 @@ import org.springframework.stereotype.Component; import org.springframework.util.DigestUtils; import org.springframework.util.ObjectUtils; import org.springframework.util.StopWatch; -import work.xuye.common.constant.MessageConstants; -import work.xuye.common.constant.RegexConstants; -import work.xuye.common.constant.StageConstants; +import work.xuye.common.constant.*; import work.xuye.common.db.entity.Task; import work.xuye.common.db.service.TaskManager; import work.xuye.common.dto.HttpRequestParams; @@ -41,10 +40,12 @@ public class SourceHandler { private final TaskManager taskManager; private final RequestClient requestClient; private final UrlMD5Service urlMD5Service; + private final StreamBridge streamBridge; public Message handle(Message message) { TaskVO taskVO = taskManager.getTaskInfoByTaskId((Integer) message.getHeaders().get(MessageConstants.TASK_ID)); + String taskName = taskVO.getTask().getName(); if (DebugUtil.isSkip(taskVO)) { return null; } @@ -60,12 +61,12 @@ public class SourceHandler { urlMD5Service.put(requestParams.getUrl(), md5Digest); res.setResourceStatus(resourceStatus); } catch (Exception e) { - log.error("[{}] request error, request params:[{}], error message:[{}]", taskVO.getTask().getName(), requestParams, e.getMessage()); + log.error("[{}] request error, request params:[{}], error message:[{}]", taskName, requestParams, e.getMessage()); return null; } if (!res.getStatus().is2xxSuccessful()) { - log.warn("[{}] response status code is not 2xx, request params: [{}], response: [{}]", taskVO.getTask().getName(), requestParams, res); - throw new RuntimeException("[" + taskVO.getTask().getName() + "]" + "unexpected status code: " + res.getStatus()); + log.warn("[{}] response status code is not 2xx, request params: [{}], response: [{}]", taskName, requestParams, res); + throw new RuntimeException("[" + taskName + "]" + "unexpected status code: " + res.getStatus()); } ProcessMode processMode = taskManager .getTaskInfoByTaskId( @@ -76,9 +77,29 @@ public class SourceHandler { if (res.getResourceStatus().equals(ResourceStatus.UNCHANGED) && !ProcessMode.DEBUG.equals(processMode)) { return null; } + + if (ProcessMode.WATCH.equals(processMode)) { + Message watchMessage = MessageBuilder + .withPayload(res.getBody()) + .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE_WATCH) + .setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) + .setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate()) + .setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) + .setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME)) + .setHeader(SnapshotConstants.URL, requestParams.getUrl()) + .setHeader(SnapshotConstants.RESOURCE_STATUS, res.getResourceStatus()) + .setHeader(SnapshotConstants.STATUS, res.getStatus()) + .build(); + + boolean send = streamBridge.send(taskName + BindingConstants.SNAPSHOT_OUT_POSTFIX, watchMessage); + if (!send) { + throw new RuntimeException("send message failed"); + } + return null; + } return MessageBuilder .withPayload(res) - .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE) + .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE_WATCH) .setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) .setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate()) .setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) diff --git a/source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java b/source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java index b01fb9c..dc3c224 100644 --- a/source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java +++ b/source/src/main/java/work/xuye/source/request/OkHttpRequestClient.java @@ -6,6 +6,7 @@ import okhttp3.*; import okio.BufferedSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Primary; +import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -16,6 +17,7 @@ import work.xuye.source.util.CharsetUtil; import java.io.IOException; import java.nio.charset.Charset; +import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -52,15 +54,39 @@ public class OkHttpRequestClient implements RequestClient { @Override public HttpRes execute(HttpRequestParams request) { Request.Builder requestBuilder = new Request.Builder(); + // request url requestBuilder.url(request.getUrl()); + + // request header if (!ObjectUtils.isEmpty(request.getHeaders())) { requestBuilder.headers(Headers.Companion.of(request.getHeaders())); } - if (!ObjectUtils.isEmpty(request.getBody())) { - MediaType mediaType = MediaType.Companion.parse("application/json;charset=utf-8"); - RequestBody requestBody = RequestBody.Companion.create(gson.toJson(request.getBody()), mediaType); + + // request body + Map body = request.getBody(); + String type = request.getMediaType(); + RequestBody requestBody = null; + //如果不是GET请求,并且请求体不为空才能构建请求体 + if (!request.getMethod().equalsIgnoreCase(HttpMethod.GET.name()) && !ObjectUtils.isEmpty(request.getBody().keySet())) { + // form url + if (org.springframework.http.MediaType.APPLICATION_FORM_URLENCODED_VALUE.equals(type)) { + FormBody.Builder formBodyBuilder = new FormBody.Builder(); + for (Map.Entry entry : request.getBody().entrySet()) { + formBodyBuilder.add(entry.getKey(), entry.getValue().toString()); + } + requestBody = formBodyBuilder.build(); + } else if (org.springframework.http.MediaType.APPLICATION_JSON_VALUE.equals(type)) { + MediaType mediaType = MediaType.Companion.parse(org.springframework.http.MediaType.APPLICATION_JSON_VALUE); + requestBody = RequestBody.Companion.create(gson.toJson(body), mediaType); + } else { + throw new RuntimeException("暂不支持的请求类型"); + } + } + if (!ObjectUtils.isEmpty(requestBody)) { requestBuilder.method(request.getMethod(), requestBody); } + + // res HttpRes httpRes = new HttpRes(); Request httpRequest = requestBuilder.build(); try (Response response = client.newCall(httpRequest).execute()) { diff --git a/transformer/src/main/java/work/xuye/transformer/TransformerApplication.java b/transformer/src/main/java/work/xuye/transformer/TransformerApplication.java index a012b99..95fee01 100644 --- a/transformer/src/main/java/work/xuye/transformer/TransformerApplication.java +++ b/transformer/src/main/java/work/xuye/transformer/TransformerApplication.java @@ -4,7 +4,7 @@ import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -@SpringBootApplication(scanBasePackages = "work.xuye.*") +@SpringBootApplication(scanBasePackages = {"work.xuye.*", "tech.deepq.components"}) @MapperScan(basePackages = "work.xuye.common.db.mapper") public class TransformerApplication { diff --git a/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java b/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java index 4e87b7e..f59a7b7 100644 --- a/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java +++ b/transformer/src/main/java/work/xuye/transformer/handler/TransformHandler.java @@ -73,6 +73,11 @@ public class TransformHandler { // 将结果根据配置的transformer进行转换 this.transformResult(message, taskVO); + if (ObjectUtils.isEmpty(message.getPayload().getBody())) { + log.warn("@@ 转换后的结果为空,不继续处理"); + return null; + } + // 消失模式处理 this.handleDisappear(message, taskVO); @@ -154,7 +159,10 @@ public class TransformHandler { .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.TRANSFORMER) .setHeader(MessageConstants.TRANSFORMER_TRACE_ID, ID.generate()) .build(); - streamBridge.send(BindingConstants.TRANSFORMER_KEYS_OUT, keysMsg); + boolean send = streamBridge.send(BindingConstants.TRANSFORMER_KEYS_OUT, keysMsg); + if (!send) { + throw new RuntimeException("send message failed"); + } } } diff --git a/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java b/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java index 70f34e8..4b342c5 100644 --- a/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java +++ b/transformer/src/main/java/work/xuye/transformer/transformer/ResDataTransformer.java @@ -14,22 +14,17 @@ import org.springframework.stereotype.Component; @Component("resData") @RequiredArgsConstructor public class ResDataTransformer implements MessageTransformer { - - private int threshold = 300; @Override public String transform(String json, String seedUrl) { JsonObject res = JsonParser.parseString(json).getAsJsonObject(); boolean hasData = res.has("data"); if (hasData) { - threshold = 300; + return res.get("data").getAsString(); } else { - threshold--; + log.warn("resData transform failed, res not has data, res: {}", res); } - if (threshold <= 0) { - throw new RuntimeException("res no data, seedUrl: " + seedUrl); - } - return res.get("data").getAsString(); + return null; } } diff --git a/transformer/src/main/java/work/xuye/transformer/transformer/XmlToJsonTransformer.java b/transformer/src/main/java/work/xuye/transformer/transformer/XmlToJsonTransformer.java index 3c7fb77..2200a64 100644 --- a/transformer/src/main/java/work/xuye/transformer/transformer/XmlToJsonTransformer.java +++ b/transformer/src/main/java/work/xuye/transformer/transformer/XmlToJsonTransformer.java @@ -5,6 +5,7 @@ import org.json.JSONException; import org.json.JSONObject; import org.json.XML; import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; /** * @author xuye @@ -13,10 +14,12 @@ import org.springframework.stereotype.Component; @Slf4j @Component("xml2json") public class XmlToJsonTransformer implements MessageTransformer { - - + @Override public String transform(String xml, String seedUrl) { + if (ObjectUtils.isEmpty(xml)) { + return null; + } JSONObject jsonObject = null; try { jsonObject = XML.toJSONObject(xml);