@@ -1,5 +1,6 @@ | |||
<?xml version="1.0" encoding="UTF-8"?> | |||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" | |||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> | |||
<modelVersion>4.0.0</modelVersion> | |||
<parent> | |||
<groupId>work.xuye</groupId> | |||
@@ -35,7 +36,7 @@ | |||
<dependency> | |||
<groupId>mysql</groupId> | |||
<artifactId>mysql-connector-java</artifactId> | |||
<version>8.0.31</version> | |||
<version>8.0.33</version> | |||
</dependency> | |||
<dependency> | |||
@@ -115,7 +116,11 @@ | |||
<scope>runtime</scope> | |||
<optional>true</optional> | |||
</dependency> | |||
<dependency> | |||
<groupId>tech.deepq</groupId> | |||
<artifactId>sq-sentry</artifactId> | |||
<version>1.1.8</version> | |||
</dependency> | |||
</dependencies> | |||
@@ -1,5 +1,6 @@ | |||
package work.xuye.common.alert; | |||
import io.sentry.Sentry; | |||
import lombok.RequiredArgsConstructor; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.aspectj.lang.annotation.AfterThrowing; | |||
@@ -24,6 +25,9 @@ public class ExceptionAspect { | |||
@AfterThrowing(pointcut = "execution(* work.xuye..*(..))", throwing = "ex") | |||
public void handleGlobalException(Exception ex) { | |||
Sentry.capture(ex); | |||
List<StackTraceElement> stackTraceElements = new java.util.ArrayList<>(List.of(ex.getStackTrace())); | |||
stackTraceElements.removeIf(stackTraceElement -> !stackTraceElement.getClassName().contains("work.xuye")); | |||
StringBuilder traceString = new StringBuilder(); | |||
@@ -8,7 +8,9 @@ public class BindingConstants { | |||
public static final String TASK_OUT = "task-out"; | |||
public static final String TRANSFORMER_KEYS_OUT = "item-keys-out"; | |||
public static final String SNAPSHOT_OUT_POSTFIX = "-snapshot-out"; | |||
public static final String TRANSFORMER_KEYS_OUT = "transformKeys-out"; | |||
} |
@@ -0,0 +1,16 @@ | |||
package work.xuye.common.constant; | |||
/** | |||
* @author xuye | |||
* @since 2023/5/17 13:23 | |||
**/ | |||
public class SnapshotConstants { | |||
public static final String RESOURCE_STATUS = "resourceStatus"; | |||
public static final String STATUS = "status"; | |||
public static final String URL = "url"; | |||
} |
@@ -8,6 +8,7 @@ public class StageConstants { | |||
public static final String SCHEDULER = "scheduler"; | |||
public static final String SOURCE = "source"; | |||
public static final String SOURCE_WATCH = "source-watch"; | |||
public static final String TRANSFORMER = "transformer"; | |||
public static final String END = "end"; | |||
@@ -4,6 +4,7 @@ import lombok.AllArgsConstructor; | |||
import lombok.Data; | |||
import lombok.NoArgsConstructor; | |||
import org.springframework.http.HttpMethod; | |||
import org.springframework.http.MediaType; | |||
import java.util.Map; | |||
@@ -27,5 +28,7 @@ public class HttpRequestParams { | |||
private Map<String, String> headers = Map.of(); | |||
private Map<String, String> placeholderExpressions = Map.of(); | |||
private String mediaType = MediaType.APPLICATION_JSON_VALUE; | |||
} |
@@ -16,6 +16,9 @@ import work.xuye.common.constant.RawDataFiledKey; | |||
import work.xuye.common.store.NsKVMapStore; | |||
import work.xuye.common.utils.HttpUtil; | |||
import java.time.Instant; | |||
import java.time.LocalDateTime; | |||
import java.time.ZoneId; | |||
import java.time.ZonedDateTime; | |||
import java.time.format.DateTimeFormatter; | |||
import java.util.Arrays; | |||
@@ -137,6 +140,17 @@ public class CustomFunction { | |||
return (String) exp.getValue(); | |||
} | |||
public String warpExclude(String json, String excludeFiled, String... fields) { | |||
JsonObject obj = JsonParser.parseString(json).getAsJsonObject(); | |||
if (obj.has(excludeFiled)) { | |||
obj.remove(excludeFiled); | |||
} else { | |||
log.warn("json warpV2 excludeFiled not exist, excludeFiled: [{}]", excludeFiled); | |||
} | |||
return this.wrap(obj.toString(), fields); | |||
} | |||
/** | |||
* 用itemData包一层的原因:避免下游程序直接读到了有特殊意义的字段,比如type(财联社资讯类型 -1:快讯,0:要闻,1:非财联社资讯) | |||
* <a href="https://app.asana.com/0/1202103682792595/1204211768982825/f">...</a> | |||
@@ -257,4 +271,12 @@ public class CustomFunction { | |||
} | |||
public String epochMilliToLocalDatetime(String json, String path) { | |||
String epochMilli = this.jsonExtract(json, path); | |||
String localDatetime = LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(epochMilli)), ZoneId.of("Asia/Shanghai")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); | |||
log.debug("epochMilliToLocalDatetime handler, json: [{}], path: [{}], epochMilli: [{}], localDatetime: [{}]", json, path, epochMilli, localDatetime); | |||
return localDatetime; | |||
} | |||
} |
@@ -12,12 +12,16 @@ import java.util.Set; | |||
@Slf4j | |||
public class DebugUtil { | |||
private static final boolean DEBUG = false; | |||
private static final Set<Integer> skipTaskIds = Set.of(); | |||
private static final Set<Integer> taskIds = Set.of(5); | |||
public static boolean isSkip(TaskVO task) { | |||
if (!DEBUG) { | |||
return false; | |||
} | |||
Integer taskId = task.getTask().getId(); | |||
if (skipTaskIds.contains(taskId)) { | |||
if (!taskIds.contains(taskId)) { | |||
log.warn("skip task, [{}-{}]", taskId, task.getTask().getName()); | |||
return true; | |||
} | |||
@@ -11,7 +11,7 @@ import org.springframework.expression.spel.standard.SpelExpressionParser; | |||
public class SpEL { | |||
public static void main(String[] args) { | |||
ExpressionParser parser = new SpelExpressionParser(); | |||
Expression exp = parser.parseExpression("T(java.time.LocalDateTime).now().minusHours(1).atZone(T(java.time.ZoneId).systemDefault()).toInstant().atOffset(T(java.time.ZoneOffset).UTC).toInstant().getEpochSecond()"); | |||
Expression exp = parser.parseExpression("T(java.lang.Math).random() < 0.5 ? 1 : 2"); | |||
Object o = exp.getValue(); | |||
System.out.println(o); | |||
} | |||
@@ -68,3 +68,37 @@ where JSON_EXTRACT(v.raw_data, '$.crawler') = 'snp' | |||
order by v.`created_time` desc | |||
limit 50 | |||
``` | |||
## 根据spider查询视频处理状态 | |||
```sql | |||
select v.status, | |||
v.publish_time, | |||
v.if_update, | |||
t.status, | |||
v.title, | |||
v.url, | |||
v.`video_url`, | |||
v.spider, | |||
v.`created_time`, | |||
v.`raw_data`, | |||
t.`status`, | |||
t.`retry_count`, | |||
c.* | |||
from `all_news_video_dycj` as v | |||
left join `task_news` as t on v.`url` = t.`url` | |||
left join `task_news_content` as c on t.`id` = c.id | |||
where v.spider = 'bosera_videos_api' | |||
order by v.`created_time` desc | |||
limit 100 | |||
``` | |||
## 根据spider清表重爬 | |||
```sql | |||
delete v.*,c.*,t.* | |||
from `all_news_video_dycj` as v | |||
inner join `task_news` as t on v.`url` = t.`url` | |||
inner join `task_news_content` as c on t.`id` = c.id | |||
where v.spider = 'bosera_videos_api'; | |||
``` |
@@ -6,7 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; | |||
import org.springframework.scheduling.annotation.EnableScheduling; | |||
@EnableScheduling | |||
@SpringBootApplication(scanBasePackages = "work.xuye.*") | |||
@SpringBootApplication(scanBasePackages = {"work.xuye.*", "tech.deepq.components"}) | |||
@MapperScan(basePackages = "work.xuye.common.db.mapper") | |||
public class SchedulerApplication { | |||
@@ -49,7 +49,10 @@ public class IssueService implements ApplicationRunner { | |||
.setHeader(MessageConstants.TASK_ID, taskVO.getTask().getId()) | |||
.setHeader(MessageConstants.TASK_NAME, taskVO.getTask().getName()) | |||
.build(); | |||
streamBridge.send(BindingConstants.TASK_OUT, message); | |||
boolean send = streamBridge.send(BindingConstants.TASK_OUT, message); | |||
if (!send) { | |||
throw new RuntimeException("send message failed"); | |||
} | |||
ProcessMode processMode = taskVO.getTask().getProcessMode(); | |||
String emoji = processMode.equals(ProcessMode.DEBUG) ? "\uD83E\uDD16" : ""; | |||
log.info("\uD83D\uDCE4 {} {}-{}", | |||
@@ -6,7 +6,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; | |||
import org.springframework.scheduling.annotation.EnableScheduling; | |||
@EnableScheduling | |||
@SpringBootApplication(scanBasePackages = "work.xuye.*") | |||
@SpringBootApplication(scanBasePackages = {"work.xuye.*", "tech.deepq.components"}) | |||
@MapperScan(basePackages = "work.xuye.common.db.mapper") | |||
public class SinkApplication { | |||
@@ -0,0 +1,19 @@ | |||
package work.xuye.sink.controller; | |||
import org.springframework.web.bind.annotation.GetMapping; | |||
import org.springframework.web.bind.annotation.RequestMapping; | |||
import org.springframework.web.bind.annotation.RestController; | |||
/** | |||
* @author xuye | |||
* @since 2023/7/4 15:16 | |||
**/ | |||
@RestController | |||
@RequestMapping("/open") | |||
public class OpenController { | |||
@GetMapping | |||
public String open() { | |||
return "hello ~"; | |||
} | |||
} |
@@ -165,7 +165,9 @@ public class ItemHandler { | |||
private boolean tryDelete(TaskVO taskVO, TableTemplate template, HashMap<String, Object> newItem, SinkParams.CheckDelete checkDeleteConfig, ProcessReason reason) { | |||
// 如果不是 是固定值检测删除并且状态是开启,说明无需处理,直接返回 | |||
if (!(checkDeleteConfig.getMode().equals(DeleteCheckMode.fixedValue) && checkDeleteConfig.getStatus().equals(Status.on))) { | |||
if ( | |||
!(checkDeleteConfig.getStatus().equals(Status.on) && checkDeleteConfig.getMode().equals(DeleteCheckMode.fixedValue)) | |||
) { | |||
return false; | |||
} | |||
SinkParams.FixedValueConfig fixedValueConfig = checkDeleteConfig.getFixedValueConfig(); | |||
@@ -307,7 +309,8 @@ public class ItemHandler { | |||
String fieldName = checkUpdateConfig.getFieldName(); | |||
Object dbValue = dbItem.get(fieldName); | |||
if (ObjectUtils.isEmpty(dbValue)) { | |||
throw new RuntimeException("根据 [" + fieldName + "] 来判断数据是否需要更新,但是数据库不存在该字段"); | |||
log.warn("根据{}来判断数据是否需要更新,但是数据库不存在该字段", fieldName); | |||
return; | |||
} | |||
Object nowValue = resultMap.get(fieldName); | |||
if (!ObjectUtils.isEmpty(checkUpdateConfig.getJsonPath())) { | |||
@@ -4,7 +4,7 @@ import org.mybatis.spring.annotation.MapperScan; | |||
import org.springframework.boot.SpringApplication; | |||
import org.springframework.boot.autoconfigure.SpringBootApplication; | |||
@SpringBootApplication(scanBasePackages = "work.xuye.*") | |||
@SpringBootApplication(scanBasePackages = {"work.xuye.*", "tech.deepq.components"}) | |||
@MapperScan(basePackages = "work.xuye.common.db.mapper") | |||
public class SourceApplication { | |||
@@ -2,6 +2,7 @@ package work.xuye.source.handler; | |||
import lombok.RequiredArgsConstructor; | |||
import lombok.extern.slf4j.Slf4j; | |||
import org.springframework.cloud.stream.function.StreamBridge; | |||
import org.springframework.expression.spel.standard.SpelExpressionParser; | |||
import org.springframework.messaging.Message; | |||
import org.springframework.messaging.support.MessageBuilder; | |||
@@ -9,9 +10,7 @@ import org.springframework.stereotype.Component; | |||
import org.springframework.util.DigestUtils; | |||
import org.springframework.util.ObjectUtils; | |||
import org.springframework.util.StopWatch; | |||
import work.xuye.common.constant.MessageConstants; | |||
import work.xuye.common.constant.RegexConstants; | |||
import work.xuye.common.constant.StageConstants; | |||
import work.xuye.common.constant.*; | |||
import work.xuye.common.db.entity.Task; | |||
import work.xuye.common.db.service.TaskManager; | |||
import work.xuye.common.dto.HttpRequestParams; | |||
@@ -41,10 +40,12 @@ public class SourceHandler { | |||
private final TaskManager taskManager; | |||
private final RequestClient requestClient; | |||
private final UrlMD5Service urlMD5Service; | |||
private final StreamBridge streamBridge; | |||
public Message<HttpRes> handle(Message<HttpRequestParams> message) { | |||
TaskVO taskVO = taskManager.getTaskInfoByTaskId((Integer) message.getHeaders().get(MessageConstants.TASK_ID)); | |||
String taskName = taskVO.getTask().getName(); | |||
if (DebugUtil.isSkip(taskVO)) { | |||
return null; | |||
} | |||
@@ -60,12 +61,12 @@ public class SourceHandler { | |||
urlMD5Service.put(requestParams.getUrl(), md5Digest); | |||
res.setResourceStatus(resourceStatus); | |||
} catch (Exception e) { | |||
log.error("[{}] request error, request params:[{}], error message:[{}]", taskVO.getTask().getName(), requestParams, e.getMessage()); | |||
log.error("[{}] request error, request params:[{}], error message:[{}]", taskName, requestParams, e.getMessage()); | |||
return null; | |||
} | |||
if (!res.getStatus().is2xxSuccessful()) { | |||
log.warn("[{}] response status code is not 2xx, request params: [{}], response: [{}]", taskVO.getTask().getName(), requestParams, res); | |||
throw new RuntimeException("[" + taskVO.getTask().getName() + "]" + "unexpected status code: " + res.getStatus()); | |||
log.warn("[{}] response status code is not 2xx, request params: [{}], response: [{}]", taskName, requestParams, res); | |||
throw new RuntimeException("[" + taskName + "]" + "unexpected status code: " + res.getStatus()); | |||
} | |||
ProcessMode processMode = taskManager | |||
.getTaskInfoByTaskId( | |||
@@ -76,9 +77,29 @@ public class SourceHandler { | |||
if (res.getResourceStatus().equals(ResourceStatus.UNCHANGED) && !ProcessMode.DEBUG.equals(processMode)) { | |||
return null; | |||
} | |||
if (ProcessMode.WATCH.equals(processMode)) { | |||
Message<String> watchMessage = MessageBuilder | |||
.withPayload(res.getBody()) | |||
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE_WATCH) | |||
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) | |||
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate()) | |||
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) | |||
.setHeader(MessageConstants.TASK_NAME, message.getHeaders().get(MessageConstants.TASK_NAME)) | |||
.setHeader(SnapshotConstants.URL, requestParams.getUrl()) | |||
.setHeader(SnapshotConstants.RESOURCE_STATUS, res.getResourceStatus()) | |||
.setHeader(SnapshotConstants.STATUS, res.getStatus()) | |||
.build(); | |||
boolean send = streamBridge.send(taskName + BindingConstants.SNAPSHOT_OUT_POSTFIX, watchMessage); | |||
if (!send) { | |||
throw new RuntimeException("send message failed"); | |||
} | |||
return null; | |||
} | |||
return MessageBuilder | |||
.withPayload(res) | |||
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE) | |||
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.SOURCE_WATCH) | |||
.setHeader(MessageConstants.TASK_TRACE_ID, message.getHeaders().get(MessageConstants.TASK_TRACE_ID)) | |||
.setHeader(MessageConstants.SOURCE_TRACE_ID, ID.generate()) | |||
.setHeader(MessageConstants.TASK_ID, message.getHeaders().get(MessageConstants.TASK_ID)) | |||
@@ -6,6 +6,7 @@ import okhttp3.*; | |||
import okio.BufferedSource; | |||
import org.springframework.beans.factory.annotation.Autowired; | |||
import org.springframework.context.annotation.Primary; | |||
import org.springframework.http.HttpMethod; | |||
import org.springframework.http.HttpStatus; | |||
import org.springframework.stereotype.Component; | |||
import org.springframework.util.ObjectUtils; | |||
@@ -16,6 +17,7 @@ import work.xuye.source.util.CharsetUtil; | |||
import java.io.IOException; | |||
import java.nio.charset.Charset; | |||
import java.util.Map; | |||
import java.util.concurrent.TimeUnit; | |||
/** | |||
@@ -52,15 +54,39 @@ public class OkHttpRequestClient implements RequestClient { | |||
@Override | |||
public HttpRes execute(HttpRequestParams request) { | |||
Request.Builder requestBuilder = new Request.Builder(); | |||
// request url | |||
requestBuilder.url(request.getUrl()); | |||
// request header | |||
if (!ObjectUtils.isEmpty(request.getHeaders())) { | |||
requestBuilder.headers(Headers.Companion.of(request.getHeaders())); | |||
} | |||
if (!ObjectUtils.isEmpty(request.getBody())) { | |||
MediaType mediaType = MediaType.Companion.parse("application/json;charset=utf-8"); | |||
RequestBody requestBody = RequestBody.Companion.create(gson.toJson(request.getBody()), mediaType); | |||
// request body | |||
Map<String, Object> body = request.getBody(); | |||
String type = request.getMediaType(); | |||
RequestBody requestBody = null; | |||
//如果不是GET请求,并且请求体不为空才能构建请求体 | |||
if (!request.getMethod().equalsIgnoreCase(HttpMethod.GET.name()) && !ObjectUtils.isEmpty(request.getBody().keySet())) { | |||
// form url | |||
if (org.springframework.http.MediaType.APPLICATION_FORM_URLENCODED_VALUE.equals(type)) { | |||
FormBody.Builder formBodyBuilder = new FormBody.Builder(); | |||
for (Map.Entry<String, Object> entry : request.getBody().entrySet()) { | |||
formBodyBuilder.add(entry.getKey(), entry.getValue().toString()); | |||
} | |||
requestBody = formBodyBuilder.build(); | |||
} else if (org.springframework.http.MediaType.APPLICATION_JSON_VALUE.equals(type)) { | |||
MediaType mediaType = MediaType.Companion.parse(org.springframework.http.MediaType.APPLICATION_JSON_VALUE); | |||
requestBody = RequestBody.Companion.create(gson.toJson(body), mediaType); | |||
} else { | |||
throw new RuntimeException("暂不支持的请求类型"); | |||
} | |||
} | |||
if (!ObjectUtils.isEmpty(requestBody)) { | |||
requestBuilder.method(request.getMethod(), requestBody); | |||
} | |||
// res | |||
HttpRes httpRes = new HttpRes(); | |||
Request httpRequest = requestBuilder.build(); | |||
try (Response response = client.newCall(httpRequest).execute()) { | |||
@@ -4,7 +4,7 @@ import org.mybatis.spring.annotation.MapperScan; | |||
import org.springframework.boot.SpringApplication; | |||
import org.springframework.boot.autoconfigure.SpringBootApplication; | |||
@SpringBootApplication(scanBasePackages = "work.xuye.*") | |||
@SpringBootApplication(scanBasePackages = {"work.xuye.*", "tech.deepq.components"}) | |||
@MapperScan(basePackages = "work.xuye.common.db.mapper") | |||
public class TransformerApplication { | |||
@@ -73,6 +73,11 @@ public class TransformHandler { | |||
// 将结果根据配置的transformer进行转换 | |||
this.transformResult(message, taskVO); | |||
if (ObjectUtils.isEmpty(message.getPayload().getBody())) { | |||
log.warn("@@ 转换后的结果为空,不继续处理"); | |||
return null; | |||
} | |||
// 消失模式处理 | |||
this.handleDisappear(message, taskVO); | |||
@@ -154,7 +159,10 @@ public class TransformHandler { | |||
.setHeader(MessageConstants.CURRENT_STAGE, StageConstants.TRANSFORMER) | |||
.setHeader(MessageConstants.TRANSFORMER_TRACE_ID, ID.generate()) | |||
.build(); | |||
streamBridge.send(BindingConstants.TRANSFORMER_KEYS_OUT, keysMsg); | |||
boolean send = streamBridge.send(BindingConstants.TRANSFORMER_KEYS_OUT, keysMsg); | |||
if (!send) { | |||
throw new RuntimeException("send message failed"); | |||
} | |||
} | |||
} | |||
@@ -14,22 +14,17 @@ import org.springframework.stereotype.Component; | |||
@Component("resData") | |||
@RequiredArgsConstructor | |||
public class ResDataTransformer implements MessageTransformer { | |||
private int threshold = 300; | |||
@Override | |||
public String transform(String json, String seedUrl) { | |||
JsonObject res = JsonParser.parseString(json).getAsJsonObject(); | |||
boolean hasData = res.has("data"); | |||
if (hasData) { | |||
threshold = 300; | |||
return res.get("data").getAsString(); | |||
} else { | |||
threshold--; | |||
log.warn("resData transform failed, res not has data, res: {}", res); | |||
} | |||
if (threshold <= 0) { | |||
throw new RuntimeException("res no data, seedUrl: " + seedUrl); | |||
} | |||
return res.get("data").getAsString(); | |||
return null; | |||
} | |||
} |
@@ -5,6 +5,7 @@ import org.json.JSONException; | |||
import org.json.JSONObject; | |||
import org.json.XML; | |||
import org.springframework.stereotype.Component; | |||
import org.springframework.util.ObjectUtils; | |||
/** | |||
* @author xuye | |||
@@ -13,10 +14,12 @@ import org.springframework.stereotype.Component; | |||
@Slf4j | |||
@Component("xml2json") | |||
public class XmlToJsonTransformer implements MessageTransformer { | |||
@Override | |||
public String transform(String xml, String seedUrl) { | |||
if (ObjectUtils.isEmpty(xml)) { | |||
return null; | |||
} | |||
JSONObject jsonObject = null; | |||
try { | |||
jsonObject = XML.toJSONObject(xml); | |||