v23.7.0爬虫上线 See merge request fhl/std-news-process!4master
@@ -1,5 +1,6 @@ | |||||
<?xml version="1.0" encoding="UTF-8"?> | <?xml version="1.0" encoding="UTF-8"?> | ||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |||||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" | |||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |||||
<modelVersion>4.0.0</modelVersion> | <modelVersion>4.0.0</modelVersion> | ||||
<parent> | <parent> | ||||
<groupId>work.xuye</groupId> | <groupId>work.xuye</groupId> | ||||
@@ -35,7 +36,7 @@ | |||||
<dependency> | <dependency> | ||||
<groupId>mysql</groupId> | <groupId>mysql</groupId> | ||||
<artifactId>mysql-connector-java</artifactId> | <artifactId>mysql-connector-java</artifactId> | ||||
<version>8.0.31</version> | |||||
<version>8.0.33</version> | |||||
</dependency> | </dependency> | ||||
<dependency> | <dependency> | ||||
@@ -115,7 +116,11 @@ | |||||
<scope>runtime</scope> | <scope>runtime</scope> | ||||
<optional>true</optional> | <optional>true</optional> | ||||
</dependency> | </dependency> | ||||
<dependency> | |||||
<groupId>tech.deepq</groupId> | |||||
<artifactId>sq-sentry</artifactId> | |||||
<version>1.1.8</version> | |||||
</dependency> | |||||
</dependencies> | </dependencies> | ||||
@@ -1,5 +1,6 @@ | |||||
package work.xuye.common.alert; | package work.xuye.common.alert; | ||||
import io.sentry.Sentry; | |||||
import lombok.RequiredArgsConstructor; | import lombok.RequiredArgsConstructor; | ||||
import lombok.extern.slf4j.Slf4j; | import lombok.extern.slf4j.Slf4j; | ||||
import org.aspectj.lang.annotation.AfterThrowing; | import org.aspectj.lang.annotation.AfterThrowing; | ||||
@@ -24,6 +25,9 @@ public class ExceptionAspect { | |||||
@AfterThrowing(pointcut = "execution(* work.xuye..*(..))", throwing = "ex") | @AfterThrowing(pointcut = "execution(* work.xuye..*(..))", throwing = "ex") | ||||
public void handleGlobalException(Exception ex) { | public void handleGlobalException(Exception ex) { | ||||
Sentry.capture(ex); | |||||
List<StackTraceElement> stackTraceElements = new java.util.ArrayList<>(List.of(ex.getStackTrace())); | List<StackTraceElement> stackTraceElements = new java.util.ArrayList<>(List.of(ex.getStackTrace())); | ||||
stackTraceElements.removeIf(stackTraceElement -> !stackTraceElement.getClassName().contains("work.xuye")); | stackTraceElements.removeIf(stackTraceElement -> !stackTraceElement.getClassName().contains("work.xuye")); | ||||
StringBuilder traceString = new StringBuilder(); | StringBuilder traceString = new StringBuilder(); | ||||
@@ -8,7 +8,9 @@ public class BindingConstants { | |||||
public static final String TASK_OUT = "task-out"; | public static final String TASK_OUT = "task-out"; | ||||
public static final String TRANSFORMER_KEYS_OUT = "item-keys-out"; | |||||
public static final String SNAPSHOT_OUT_POSTFIX = "-snapshot-out"; | |||||
public static final String TRANSFORMER_KEYS_OUT = "transformKeys-out"; | |||||
} | } |
@@ -0,0 +1,16 @@ | |||||
package work.xuye.common.constant; | |||||
/** | |||||
* @author xuye | |||||
* @since 2023/5/17 13:23 | |||||
**/ | |||||
public class SnapshotConstants { | |||||
public static final String RESOURCE_STATUS = "resourceStatus"; | |||||
public static final String STATUS = "status"; | |||||
public static final String URL = "url"; | |||||
} |
@@ -8,6 +8,7 @@ public class StageConstants { | |||||
public static final String SCHEDULER = "scheduler"; | public static final String SCHEDULER = "scheduler"; | ||||
public static final String SOURCE = "source"; | public static final String SOURCE = "source"; | ||||
public static final String SOURCE_WATCH = "source-watch"; | |||||
public static final String TRANSFORMER = "transformer"; | public static final String TRANSFORMER = "transformer"; | ||||
public static final String END = "end"; | public static final String END = "end"; | ||||
@@ -4,6 +4,7 @@ import lombok.AllArgsConstructor; | |||||
import lombok.Data; | import lombok.Data; | ||||
import lombok.NoArgsConstructor; | import lombok.NoArgsConstructor; | ||||
import org.springframework.http.HttpMethod; | import org.springframework.http.HttpMethod; | ||||
import org.springframework.http.MediaType; | |||||
import java.util.Map; | import java.util.Map; | ||||
@@ -27,5 +28,7 @@ public class HttpRequestParams { | |||||
private Map<String, String> headers = Map.of(); | private Map<String, String> headers = Map.of(); | ||||
private Map<String, String> placeholderExpressions = Map.of(); | private Map<String, String> placeholderExpressions = Map.of(); | ||||
private String mediaType = MediaType.APPLICATION_JSON_VALUE; | |||||
} | } |
@@ -16,6 +16,9 @@ import work.xuye.common.constant.RawDataFiledKey; | |||||
import work.xuye.common.store.NsKVMapStore; | import work.xuye.common.store.NsKVMapStore; | ||||
import work.xuye.common.utils.HttpUtil; | import work.xuye.common.utils.HttpUtil; | ||||
import java.time.Instant; | |||||
import java.time.LocalDateTime; | |||||
import java.time.ZoneId; | |||||
import java.time.ZonedDateTime; | import java.time.ZonedDateTime; | ||||
import java.time.format.DateTimeFormatter; | import java.time.format.DateTimeFormatter; | ||||
import java.util.Arrays; | import java.util.Arrays; | ||||
@@ -137,6 +140,17 @@ public class CustomFunction { | |||||
return (String) exp.getValue(); | return (String) exp.getValue(); | ||||
} | } | ||||
public String warpExclude(String json, String excludeFiled, String... fields) { | |||||
JsonObject obj = JsonParser.parseString(json).getAsJsonObject(); | |||||
if (obj.has(excludeFiled)) { | |||||
obj.remove(excludeFiled); | |||||
} else { | |||||
log.warn("json warpV2 excludeFiled not exist, excludeFiled: [{}]", excludeFiled); | |||||
} | |||||
return this.wrap(obj.toString(), fields); | |||||
} | |||||
/** | /** | ||||
* 用itemData包一层的原因:避免下游程序直接读到了有特殊意义的字段,比如type(财联社资讯类型 -1:快讯,0:要闻,1:非财联社资讯) | * 用itemData包一层的原因:避免下游程序直接读到了有特殊意义的字段,比如type(财联社资讯类型 -1:快讯,0:要闻,1:非财联社资讯) | ||||
* <a href="https://app.asana.com/0/1202103682792595/1204211768982825/f">...</a> | * <a href="https://app.asana.com/0/1202103682792595/1204211768982825/f">...</a> | ||||
@@ -257,4 +271,12 @@ public class CustomFunction { | |||||
} | } | ||||
public String epochMilliToLocalDatetime(String json, String path) { | |||||
String epochMilli = this.jsonExtract(json, path); | |||||
String localDatetime = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(epochMilli)), ZoneId.of("Asia/Shanghai")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); | |||||
log.debug("epochMilliToLocalDatetime handler, json: [{}], path: [{}], epochMilli: [{}], localDatetime: [{}]", json, path, epochMilli, localDatetime); | |||||
return localDatetime; | |||||
} | |||||
} | } |
@@ -12,12 +12,16 @@ import java.util.Set; | |||||
@Slf4j | @Slf4j | ||||
public class DebugUtil { | public class DebugUtil { | ||||
private static final boolean DEBUG = false; | |||||
private static final Set<Integer> skipTaskIds = Set.of(); | |||||
private static final Set<Integer> taskIds = Set.of(5); | |||||
public static boolean isSkip(TaskVO task) { | public static boolean isSkip(TaskVO task) { | ||||
if (!DEBUG) { | |||||
return false; | |||||
} | |||||
Integer taskId = task.getTask().getId(); | Integer taskId = task.getTask().getId(); | ||||
if (skipTaskIds.contains(taskId)) { | |||||
if (!taskIds.contains(taskId)) { | |||||
log.warn("skip task, [{}-{}]", taskId, task.getTask().getName()); | log.warn("skip task, [{}-{}]", taskId, task.getTask().getName()); | ||||
return true; | return true; | ||||
} | } | ||||
@@ -11,7 +11,7 @@ import org.springframework.expression.spel.standard.SpelExpressionParser; | |||||
public class SpEL { | public class SpEL { | ||||
public static void main(String[] args) { | public static void main(String[] args) { | ||||
ExpressionParser parser = new SpelExpressionParser(); | ExpressionParser parser = new SpelExpressionParser(); | ||||
Expression exp = parser.parseExpression("T(java.time.LocalDateTime).now().minusHours(1).atZone(T(java.time.ZoneId).systemDefault()).toInstant().atOffset(T(java.time.ZoneOffset).UTC).toInstant().getEpochSecond()"); | |||||
Expression exp = parser.parseExpression("T(java.lang.Math).random() < 0.5 ? 1 : 2"); | |||||
Object o = exp.getValue(); | Object o = exp.getValue(); | ||||
System.out.println(o); | System.out.println(o); | ||||
} | } | ||||
@@ -68,3 +68,37 @@ where JSON_EXTRACT(v.raw_data, '$.crawler') = 'snp' | |||||
order by v.`created_time` desc | order by v.`created_time` desc | ||||
limit 50 | limit 50 | ||||
``` | ``` | ||||
## 根据spider查询视频处理状态 | |||||
```sql | |||||
select v.status, | |||||
v.publish_time, | |||||
v.if_update, | |||||
t.status, | |||||
v.title, | |||||
v.url, | |||||
v.`video_url`, | |||||
v.spider, | |||||
v.`created_time`, | |||||
v.`raw_data`, | |||||
t.`status`, | |||||
t.`retry_count`, | |||||
c.* | |||||
from `all_news_video_dycj` as v | |||||
left join `task_news` as t on v.`url` = t.`url` | |||||
left join `task_news_content` as c on t.`id` = c.id | |||||
where v.spider = 'bosera_videos_api' | |||||
order by v.`created_time` desc | |||||
limit 100 | |||||
``` | |||||
## 根据spider清表重爬 | |||||
```sql | |||||
delete v.*,c.*,t.* | |||||
from `all_news_video_dycj` as v | |||||
inner join `task_news` as t on v.`url` = t.`url` | |||||
inner join `task_news_content` as c on t.`id` = c.id | |||||
where v.spider = 'bosera_videos_api'; | |||||
``` |
@@ -6,7 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; | |||||
import org.springframework.scheduling.annotation.EnableScheduling; | import org.springframework.scheduling.annotation.EnableScheduling; | ||||
@EnableScheduling | @EnableScheduling | ||||
@SpringBootApplication(scanBasePackages = "work.xuye.*") | |||||
@SpringBootApplication(scanBasePackages = {"work.xuye.*", "tech.deepq.components"}) | |||||
@MapperScan(basePackages = "work.xuye.common.db.mapper") | @MapperScan(basePackages = "work.xuye.common.db.mapper") | ||||
public class SchedulerApplication { | public class SchedulerApplication { | ||||
@@ -49,7 +49,10 @@ public class IssueService implements ApplicationRunner { | |||||
.setHeader(MessageConstants.TASK_ID, taskVO.getTask().getId()) | .setHeader(MessageConstants.TASK_ID, taskVO.getTask().getId()) | ||||
.setHeader(MessageConstants.TASK_NAME, taskVO.getTask().getName()) | .setHeader(MessageConstants.TASK_NAME, taskVO.getTask().getName()) | ||||
.build(); | .build(); | ||||
streamBridge.send(BindingConstants.TASK_OUT, message); | |||||
boolean send = streamBridge.send(BindingConstants.TASK_OUT, message); | |||||
if (!send) { | |||||
throw new RuntimeException("send message failed"); | |||||
} | |||||
ProcessMode processMode = taskVO.getTask().getProcessMode(); | ProcessMode processMode = taskVO.getTask().getProcessMode(); | ||||
String emoji = processMode.equals(ProcessMode.DEBUG) ? "\uD83E\uDD16" : ""; | String emoji = processMode.equals(ProcessMode.DEBUG) ? "\uD83E\uDD16" : ""; | ||||
log.info("\uD83D\uDCE4 {} {}-{}", | log.info("\uD83D\uDCE4 {} {}-{}", | ||||
@@ -6,7 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; | |||||
import org.springframework.scheduling.annotation.EnableScheduling; | import org.springframework.scheduling.annotation.EnableScheduling; | ||||
@EnableScheduling | @EnableScheduling | ||||
@SpringBootApplication(scanBasePackages = "work.xuye.*") | |||||
@SpringBootApplication(scanBasePackages = {"work.xuye.*", "tech.deepq.components"}) | |||||
@MapperScan(basePackages = "work.xuye.common.db.mapper") | @MapperScan(basePackages = "work.xuye.common.db.mapper") | ||||
public class SinkApplication { | public class SinkApplication { | ||||
@@ -0,0 +1,19 @@ | |||||
package work.xuye.sink.controller; | |||||
import org.springframework.web.bind.annotation.GetMapping; | |||||
import org.springframework.web.bind.annotation.RequestMapping; | |||||
import org.springframework.web.bind.annotation.RestController; | |||||
/** | |||||
* @author xuye | |||||
* @since 2023/7/4 15:16 | |||||
**/ | |||||
@RestController | |||||
@RequestMapping("/open") | |||||
public class OpenController { | |||||
@GetMapping | |||||
public String open() { | |||||
return "hello ~"; | |||||
} | |||||
} |
@@ -165,7 +165,9 @@ public class ItemHandler { | |||||
private boolean tryDelete(TaskVO taskVO, TableTemplate template, HashMap<String, Object> newItem, SinkParams.CheckDelete checkDeleteConfig, ProcessReason reason) { | private boolean tryDelete(TaskVO taskVO, TableTemplate template, HashMap<String, Object> newItem, SinkParams.CheckDelete checkDeleteConfig, ProcessReason reason) { | ||||
// 如果不是 是固定值检测删除并且状态是开启,说明无需处理,直接返回 | // 如果不是 是固定值检测删除并且状态是开启,说明无需处理,直接返回 | ||||
if (!(checkDeleteConfig.getMode().equals(DeleteCheckMode.fixedValue) && checkDeleteConfig.getStatus().equals(Status.on))) { | |||||
if ( | |||||
!(checkDeleteConfig.getStatus().equals(Status.on) && checkDeleteConfig.getMode().equals(DeleteCheckMode.fixedValue)) | |||||
) { | |||||
return false; | return false; | ||||
} | } | ||||
SinkParams.FixedValueConfig fixedValueConfig = checkDeleteConfig.getFixedValueConfig(); | SinkParams.FixedValueConfig fixedValueConfig = checkDeleteConfig.getFixedValueConfig(); | ||||
@@ -307,7 +309,8 @@ public class ItemHandler { | |||||
String fieldName = checkUpdateConfig.getFieldName(); | String fieldName = checkUpdateConfig.getFieldName(); | ||||
Object dbValue = dbItem.get(fieldName); | Object dbValue = dbItem.get(fieldName); | ||||
if (ObjectUtils.isEmpty(dbValue)) { | if (ObjectUtils.isEmpty(dbValue)) { | ||||
throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是数据库不存在该字段"); | |||||
log.warn("根据{}来判断数据是否需要更新,但是数据库不存在该字段", fieldName); | |||||
return; | |||||
} | } | ||||
Object nowValue = resultMap.get(fieldName); | Object nowValue = resultMap.get(fieldName); | ||||
if (!ObjectUtils.isEmpty(checkUpdateConfig.getJsonPath())) { | if (!ObjectUtils.isEmpty(checkUpdateConfig.getJsonPath())) { | ||||
@@ -4,7 +4,7 @@ import org.mybatis.spring.annotation.MapperScan; | |||||
import org.springframework.boot.SpringApplication; | import org.springframework.boot.SpringApplication; | ||||
import org.springframework.boot.autoconfigure.SpringBootApplication; | import org.springframework.boot.autoconfigure.SpringBootApplication; | ||||
@SpringBootApplication(scanBasePackages = "work.xuye.*") | |||||
@SpringBootApplication(scanBasePackages = {"work.xuye.*", "tech.deepq.components"}) | |||||
@MapperScan(basePackages = "work.xuye.common.db.mapper") | @MapperScan(basePackages = "work.xuye.common.db.mapper") | ||||
public class SourceApplication { | public class SourceApplication { | ||||
@@ -2,6 +2,7 @@ package work.xuye.source.handler; | |||||
import lombok.RequiredArgsConstructor; | import lombok.RequiredArgsConstructor; | ||||
import lombok.extern.slf4j.Slf4j; | import lombok.extern.slf4j.Slf4j; | ||||
import org.springframework.cloud.stream.function.StreamBridge; | |||||
import org.springframework.expression.spel.standard.SpelExpressionParser; | import org.springframework.expression.spel.standard.SpelExpressionParser; | ||||
import org.springframework.messaging.Message; | import org.springframework.messaging.Message; | ||||
import org.springframework.messaging.support.MessageBuilder; | import org.springframework.messaging.support.MessageBuilder; | ||||
@@ -9,9 +10,7 @@ import org.springframework.stereotype.Component; | |||||
import org.springframework.util.DigestUtils; | import org.springframework.util.DigestUtils; | ||||
import org.springframework.util.ObjectUtils; | import org.springframework.util.ObjectUtils; | ||||
import org.springframework.util.StopWatch; | import org.springframework.util.StopWatch; | ||||
import work.xuye.common.constant.MessageConstants; | |||||
import work.xuye.common.constant.RegexConstants; | |||||
import work.xuye.common.constant.StageConstants; | |||||
import work.xuye.common.constant.*; | |||||
import work.xuye.common.db.entity.Task; | import work.xuye.common.db.entity.Task; | ||||
import work.xuye.common.db.service.TaskManager; | import work.xuye.common.db.service.TaskManager; | ||||
import work.xuye.common.dto.HttpRequestParams; | import work.xuye.common.dto.HttpRequestParams; | ||||
@@ -41,10 +40,12 @@ public class SourceHandler { | |||||
private final TaskManager taskManager; | private final TaskManager taskManager; | ||||
private final RequestClient requestClient; | private final RequestClient requestClient; | ||||
private final UrlMD5Service urlMD5Service; | private final UrlMD5Service urlMD5Service; | ||||
private final StreamBridge streamBridge; | |||||
public Message<HttpRes> handle(Message<HttpRequestParams> message) { | public Message<HttpRes> handle(Message<HttpRequestParams> message) { | ||||
TaskVO taskVO = taskManager.getTaskInfoByTaskId((Integer) message.getHeaders().get(MessageConstants.TASK_ID)); | TaskVO taskVO = taskManager.getTaskInfoByTaskId((Integer) message.getHeaders().get(MessageConstants.TASK_ID)); | ||||
String taskName = taskVO.getTask().getName(); | |||||
if (DebugUtil.isSkip(taskVO)) { | if (DebugUtil.isSkip(taskVO)) { | ||||
return null; | return null; | ||||
} | } | ||||
@@ -60,12 +61,12 @@ public class SourceHandler { | |||||
urlMD5Service.put(requestParams.getUrl(), md5Digest); | urlMD5Service.put(requestParams.getUrl(), md5Digest); | ||||
res.setResourceStatus(resourceStatus); | res.setResourceStatus(resourceStatus); | ||||
} catch (Exception e) { | } catch (Exception e) { | ||||
log.error("[{}] request error, request params:[{}], error message:[{}]", taskVO.getTask().getName(), requestParams, e.getMessage()); | |||||
log.error("[{}] request error, request params:[{}], error message:[{}]", taskName, requestParams, e.getMessage()); | |||||
return null; | return null; | ||||
} | } | ||||
if (!res.getStatus().is2xxSuccessful()) { | if (!res.getStatus().is2xxSuccessful()) { | ||||
log.warn("[{}] response status code is not 2xx, request params: [{}], response: [{}]", taskVO.getTask().getName(), requestParams, res); | |||||
throw new RuntimeException("[" + taskVO.getTask().getName() + "]" + "unexpected status code: " + res.getStatus()); | |||||
log.warn("[{}] response status code is not 2xx, request params: [{}], response: [{}]", taskName, requestParams, res); | |||||
throw new RuntimeException("[" + taskName + "]" + "unexpected status code: " + res.getStatus()); | |||||
} | } | ||||
ProcessMode processMode = taskManager | ProcessMode processMode = taskManager | ||||
.getTaskInfoByTaskId( | .getTaskInfoByTaskId( | ||||
@@ -76,9 +77,29 @@ public class SourceHandler { | |||||
if (res.getResourceStatus().equals(ResourceStatus.UNCHANGED) && !ProcessMode.DEBUG.equals(processMode)) { | if (res.getResourceStatus().equals(ResourceStatus.UNCHANGED) && !ProcessMode.DEBUG.equals(processMode)) { | ||||
return null; | return null; | ||||
} | } | ||||
if (ProcessMode.WATCH.equals(processMode)) { | |||||
Message<String> watchMessage = MessageBuilder | |||||
.withPayload(res.getBody()) | |||||
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE_WATCH) | |||||
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) | |||||
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate()) | |||||
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) | |||||
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME)) | |||||
.setHeader(SnapshotConstants.URL, requestParams.getUrl()) | |||||
.setHeader(SnapshotConstants.RESOURCE_STATUS, res.getResourceStatus()) | |||||
.setHeader(SnapshotConstants.STATUS, res.getStatus()) | |||||
.build(); | |||||
boolean send = streamBridge.send(taskName + BindingConstants.SNAPSHOT_OUT_POSTFIX, watchMessage); | |||||
if (!send) { | |||||
throw new RuntimeException("send message failed"); | |||||
} | |||||
return null; | |||||
} | |||||
return MessageBuilder | return MessageBuilder | ||||
.withPayload(res) | .withPayload(res) | ||||
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE) | |||||
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE_WATCH) | |||||
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) | .setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) | ||||
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate()) | .setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate()) | ||||
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) | .setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) | ||||
@@ -6,6 +6,7 @@ import okhttp3.*; | |||||
import okio.BufferedSource; | import okio.BufferedSource; | ||||
import org.springframework.beans.factory.annotation.Autowired; | import org.springframework.beans.factory.annotation.Autowired; | ||||
import org.springframework.context.annotation.Primary; | import org.springframework.context.annotation.Primary; | ||||
import org.springframework.http.HttpMethod; | |||||
import org.springframework.http.HttpStatus; | import org.springframework.http.HttpStatus; | ||||
import org.springframework.stereotype.Component; | import org.springframework.stereotype.Component; | ||||
import org.springframework.util.ObjectUtils; | import org.springframework.util.ObjectUtils; | ||||
@@ -16,6 +17,7 @@ import work.xuye.source.util.CharsetUtil; | |||||
import java.io.IOException; | import java.io.IOException; | ||||
import java.nio.charset.Charset; | import java.nio.charset.Charset; | ||||
import java.util.Map; | |||||
import java.util.concurrent.TimeUnit; | import java.util.concurrent.TimeUnit; | ||||
/** | /** | ||||
@@ -52,15 +54,39 @@ public class OkHttpRequestClient implements RequestClient { | |||||
@Override | @Override | ||||
public HttpRes execute(HttpRequestParams request) { | public HttpRes execute(HttpRequestParams request) { | ||||
Request.Builder requestBuilder = new Request.Builder(); | Request.Builder requestBuilder = new Request.Builder(); | ||||
// request url | |||||
requestBuilder.url(request.getUrl()); | requestBuilder.url(request.getUrl()); | ||||
// request header | |||||
if (!ObjectUtils.isEmpty(request.getHeaders())) { | if (!ObjectUtils.isEmpty(request.getHeaders())) { | ||||
requestBuilder.headers(Headers.Companion.of(request.getHeaders())); | requestBuilder.headers(Headers.Companion.of(request.getHeaders())); | ||||
} | } | ||||
if (!ObjectUtils.isEmpty(request.getBody())) { | |||||
MediaType mediaType = MediaType.Companion.parse("application/json;charset=utf-8"); | |||||
RequestBody requestBody = RequestBody.Companion.create(gson.toJson(request.getBody()), mediaType); | |||||
// request body | |||||
Map<String, Object> body = request.getBody(); | |||||
String type = request.getMediaType(); | |||||
RequestBody requestBody = null; | |||||
//如果不是GET请求,并且请求体不为空才能构建请求体 | |||||
if (!request.getMethod().equalsIgnoreCase(HttpMethod.GET.name()) && !ObjectUtils.isEmpty(request.getBody().keySet())) { | |||||
// form url | |||||
if (org.springframework.http.MediaType.APPLICATION_FORM_URLENCODED_VALUE.equals(type)) { | |||||
FormBody.Builder formBodyBuilder = new FormBody.Builder(); | |||||
for (Map.Entry<String, Object> entry : request.getBody().entrySet()) { | |||||
formBodyBuilder.add(entry.getKey(), entry.getValue().toString()); | |||||
} | |||||
requestBody = formBodyBuilder.build(); | |||||
} else if (org.springframework.http.MediaType.APPLICATION_JSON_VALUE.equals(type)) { | |||||
MediaType mediaType = MediaType.Companion.parse(org.springframework.http.MediaType.APPLICATION_JSON_VALUE); | |||||
requestBody = RequestBody.Companion.create(gson.toJson(body), mediaType); | |||||
} else { | |||||
throw new RuntimeException("暂不支持的请求类型"); | |||||
} | |||||
} | |||||
if (!ObjectUtils.isEmpty(requestBody)) { | |||||
requestBuilder.method(request.getMethod(), requestBody); | requestBuilder.method(request.getMethod(), requestBody); | ||||
} | } | ||||
// res | |||||
HttpRes httpRes = new HttpRes(); | HttpRes httpRes = new HttpRes(); | ||||
Request httpRequest = requestBuilder.build(); | Request httpRequest = requestBuilder.build(); | ||||
try (Response response = client.newCall(httpRequest).execute()) { | try (Response response = client.newCall(httpRequest).execute()) { | ||||
@@ -4,7 +4,7 @@ import org.mybatis.spring.annotation.MapperScan; | |||||
import org.springframework.boot.SpringApplication; | import org.springframework.boot.SpringApplication; | ||||
import org.springframework.boot.autoconfigure.SpringBootApplication; | import org.springframework.boot.autoconfigure.SpringBootApplication; | ||||
@SpringBootApplication(scanBasePackages = "work.xuye.*") | |||||
@SpringBootApplication(scanBasePackages = {"work.xuye.*", "tech.deepq.components"}) | |||||
@MapperScan(basePackages = "work.xuye.common.db.mapper") | @MapperScan(basePackages = "work.xuye.common.db.mapper") | ||||
public class TransformerApplication { | public class TransformerApplication { | ||||
@@ -73,6 +73,11 @@ public class TransformHandler { | |||||
// 将结果根据配置的transformer进行转换 | // 将结果根据配置的transformer进行转换 | ||||
this.transformResult(message, taskVO); | this.transformResult(message, taskVO); | ||||
if (ObjectUtils.isEmpty(message.getPayload().getBody())) { | |||||
log.warn("@@ 转换后的结果为空,不继续处理"); | |||||
return null; | |||||
} | |||||
// 消失模式处理 | // 消失模式处理 | ||||
this.handleDisappear(message, taskVO); | this.handleDisappear(message, taskVO); | ||||
@@ -154,7 +159,10 @@ public class TransformHandler { | |||||
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.TRANSFORMER) | .setHeader(MessageConstants.CURRENT_STAGE, StageConstants.TRANSFORMER) | ||||
.setHeader(MessageConstants.TRANSFORMER_TRACE_ID, ID.generate()) | .setHeader(MessageConstants.TRANSFORMER_TRACE_ID, ID.generate()) | ||||
.build(); | .build(); | ||||
streamBridge.send(BindingConstants.TRANSFORMER_KEYS_OUT, keysMsg); | |||||
boolean send = streamBridge.send(BindingConstants.TRANSFORMER_KEYS_OUT, keysMsg); | |||||
if (!send) { | |||||
throw new RuntimeException("send message failed"); | |||||
} | |||||
} | } | ||||
} | } | ||||
@@ -14,22 +14,17 @@ import org.springframework.stereotype.Component; | |||||
@Component("resData") | @Component("resData") | ||||
@RequiredArgsConstructor | @RequiredArgsConstructor | ||||
public class ResDataTransformer implements MessageTransformer { | public class ResDataTransformer implements MessageTransformer { | ||||
private int threshold = 300; | |||||
@Override | @Override | ||||
public String transform(String json, String seedUrl) { | public String transform(String json, String seedUrl) { | ||||
JsonObject res = JsonParser.parseString(json).getAsJsonObject(); | JsonObject res = JsonParser.parseString(json).getAsJsonObject(); | ||||
boolean hasData = res.has("data"); | boolean hasData = res.has("data"); | ||||
if (hasData) { | if (hasData) { | ||||
threshold = 300; | |||||
return res.get("data").getAsString(); | |||||
} else { | } else { | ||||
threshold--; | |||||
log.warn("resData transform failed, res not has data, res: {}", res); | |||||
} | } | ||||
if (threshold <= 0) { | |||||
throw new RuntimeException("res no data, seedUrl: " + seedUrl); | |||||
} | |||||
return res.get("data").getAsString(); | |||||
return null; | |||||
} | } | ||||
} | } |
@@ -5,6 +5,7 @@ import org.json.JSONException; | |||||
import org.json.JSONObject; | import org.json.JSONObject; | ||||
import org.json.XML; | import org.json.XML; | ||||
import org.springframework.stereotype.Component; | import org.springframework.stereotype.Component; | ||||
import org.springframework.util.ObjectUtils; | |||||
/** | /** | ||||
* @author xuye | * @author xuye | ||||
@@ -13,10 +14,12 @@ import org.springframework.stereotype.Component; | |||||
@Slf4j | @Slf4j | ||||
@Component("xml2json") | @Component("xml2json") | ||||
public class XmlToJsonTransformer implements MessageTransformer { | public class XmlToJsonTransformer implements MessageTransformer { | ||||
@Override | @Override | ||||
public String transform(String xml, String seedUrl) { | public String transform(String xml, String seedUrl) { | ||||
if (ObjectUtils.isEmpty(xml)) { | |||||
return null; | |||||
} | |||||
JSONObject jsonObject = null; | JSONObject jsonObject = null; | ||||
try { | try { | ||||
jsonObject = XML.toJSONObject(xml); | jsonObject = XML.toJSONObject(xml); | ||||