Commit 8bc64f26 by liuyu

Merge branch 'feature' into 'release'

2023年05/16 任务添加运行中缓存

See merge request !24
parents d4a18b5f f67a8ae2
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<automatic.version>1.0.0.1-SNAPSHOT</automatic.version> <automatic.version>1.0.0.2-SNAPSHOT</automatic.version>
</properties> </properties>
<dependencies> <dependencies>
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
</parent> </parent>
<artifactId>middleware-automatic-center-client</artifactId> <artifactId>middleware-automatic-center-client</artifactId>
<version>1.0.0.1-SNAPSHOT</version> <version>1.0.0.2-SNAPSHOT</version>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
...@@ -19,10 +19,25 @@ ...@@ -19,10 +19,25 @@
<curator.version>2.12.0</curator.version> <curator.version>2.12.0</curator.version>
<base.version>2.0.0-SNAPSHOT</base.version> <base.version>2.0.0-SNAPSHOT</base.version>
<easyexcel.version>2.1.2</easyexcel.version> <easyexcel.version>2.1.2</easyexcel.version>
<redisson.version>3.17.3</redisson.version>
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>${redisson.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/easyexcel --> <!-- https://mvnrepository.com/artifact/com.alibaba/easyexcel -->
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
......
...@@ -45,5 +45,7 @@ public class GenericAttribute { ...@@ -45,5 +45,7 @@ public class GenericAttribute {
public static final String KEY = "task"; public static final String KEY = "task";
public static final String RUNNING = "running";
public static final String URL = "url"; public static final String URL = "url";
} }
package com.zhiwei.middleware.automatic.server.core;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.zhiwei.middleware.automatic.server.common.GenericAttribute;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.server.pojo.enums.ManagerType;
import com.zhiwei.middleware.automatic.server.util.RedissonUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class TaskManager {
private static final Logger log = LogManager.getLogger(TaskManager.class);
/** 定时处理线程 **/
private ScheduledExecutorService TASK_EXECUTOR;
private TaskFunctional taskFunctional;
private ManagerType managerType;
private RedissonUtil redissonUtil;
private TaskManager() {
}
/**
* 获取taskManager
* @return taskManager
*/
public static TaskManager getInstance() {
return TaskManagerHolder.getTaskManager();
}
/**
* 初始化taskManager
* @param managerType 类型
* @param redissonUtil redis工具类
* @param taskFunctional 任务执行 函数式接口
* @param pullSplit 定时间隔事件
* @param timeUnit 时间unit
*/
public static void initManager(ManagerType managerType, RedissonUtil redissonUtil, TaskFunctional taskFunctional, int pullSplit, TimeUnit timeUnit) {
TaskManagerHolder.builderManagerType(managerType);
TaskManagerHolder.builderRedissonUtil(redissonUtil);
TaskManagerHolder.builderTaskFunctional(taskFunctional);
TaskManagerHolder.builderScheduled(pullSplit, timeUnit);
}
/**
* 添加任务
* @param autoTask 任务
*/
public void putTask(AutoTask autoTask) {
// 任务为null,或者不是server端 无法分发任务
if (Objects.isNull(autoTask) || managerType != ManagerType.SERVER) {
return;
}
// 任务间隔过滤
if (autoTask.isSplitFilter() && Objects.nonNull(autoTask.getGroup())) {
String runningKey = concat(GenericAttribute.RUNNING, autoTask.getGroup());
if (Objects.nonNull(redissonUtil.getBucket(runningKey))) {
log.info("有同类型任务正在执行中:{}", autoTask.getGroup());
return;
}
// 正在运行得任务添加缓存
redissonUtil.setBucket(runningKey, "1", 30L, TimeUnit.MINUTES);
}
// 添加任务
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask));
}
/**
* 拉取任务
*/
private void pullTask() {
try {
for (String s : redissonUtil.pullQueue(GenericAttribute.KEY, 10)) {
if (Strings.isEmpty(s)) {
continue;
}
AutoTask autoTask = JSONObject.parseObject(s).toJavaObject(AutoTask.class);
// 任务执行阶段
String cacheId = taskFunctional.taskRun(autoTask);
// 删除缓存
if (!Strings.isEmpty(cacheId)) {
redissonUtil.deleteList(autoTask.getParamSource().getString(cacheId));
}
// 删除正在运行得任务
if (autoTask.isSplitFilter()) {
redissonUtil.deleteBucket(concat(GenericAttribute.RUNNING, autoTask.getGroup()));
}
}
} catch (Exception e) {
log.error("{}端任务管理器,任务执行失败:", managerType.name(), e);
}
}
public static String concat(Object... objects) {
StringBuilder sb = new StringBuilder();
for (Object obj : objects) {
sb.append(obj).append(":");
}
String resultStr = sb.toString();
return resultStr.substring(0, resultStr.length() - 1);
}
@FunctionalInterface
public interface TaskFunctional {
String taskRun(AutoTask task);
}
private static class TaskManagerHolder {
private static final TaskManager TASK_MANAGER = new TaskManager();
private static void builderManagerType(ManagerType managerType) {
if (Objects.isNull(TASK_MANAGER.managerType) && Objects.nonNull(managerType)) {
TASK_MANAGER.managerType = managerType;
}
}
private static void builderRedissonUtil(RedissonUtil redissonUtil) {
if (Objects.isNull(TASK_MANAGER.redissonUtil) && Objects.nonNull(redissonUtil)) {
TASK_MANAGER.redissonUtil = redissonUtil;
}
}
private static void builderTaskFunctional(TaskFunctional taskFunctional) {
if (Objects.isNull(TASK_MANAGER.taskFunctional) && Objects.nonNull(taskFunctional)) {
TASK_MANAGER.taskFunctional = taskFunctional;
}
}
private static void builderScheduled(int pullSplit, TimeUnit timeUnit) {
if (pullSplit != 0 || Objects.nonNull(timeUnit)) {
TASK_MANAGER.TASK_EXECUTOR = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("task-manager").build());
TASK_MANAGER.TASK_EXECUTOR.scheduleWithFixedDelay(TASK_MANAGER::pullTask, 5L, pullSplit, timeUnit);
}
}
private static TaskManager getTaskManager() {
if (Objects.isNull(TASK_MANAGER.managerType)) {
log.error("taskManager未成功初始化");
return null;
} else if (TASK_MANAGER.managerType == ManagerType.SON && (Objects.isNull(TASK_MANAGER.redissonUtil) ||
Objects.isNull(TASK_MANAGER.taskFunctional) || Objects.isNull(TASK_MANAGER.TASK_EXECUTOR))) {
log.error("son端taskManager未成功初始化");
} else if (TASK_MANAGER.managerType == ManagerType.SERVER && (Objects.isNull(TASK_MANAGER.redissonUtil))) {
log.error("service端taskManager未成功初始化");
}
return TASK_MANAGER;
}
}
}
...@@ -6,15 +6,29 @@ public class AutoTask { ...@@ -6,15 +6,29 @@ public class AutoTask {
private JSONObject paramSource; private JSONObject paramSource;
private String group;
private boolean splitFilter;
private String type; private String type;
public AutoTask() {} public AutoTask() {}
public AutoTask(String type) { public AutoTask(String type, String group) {
this.type = type;
this.group = group;
this.splitFilter = false;
this.paramSource = new JSONObject();
}
public AutoTask(String type, String group, boolean splitFilter) {
this.type = type; this.type = type;
this.group = group;
this.paramSource = new JSONObject(); this.paramSource = new JSONObject();
this.splitFilter = splitFilter;
} }
public JSONObject getParamSource() { public JSONObject getParamSource() {
return paramSource; return paramSource;
} }
...@@ -30,4 +44,20 @@ public class AutoTask { ...@@ -30,4 +44,20 @@ public class AutoTask {
public void setType(String type) { public void setType(String type) {
this.type = type; this.type = type;
} }
public String getGroup() {
return group;
}
public void setGroup(String group) {
this.group = group;
}
public boolean isSplitFilter() {
return splitFilter;
}
public void setSplitFilter(boolean splitFilter) {
this.splitFilter = splitFilter;
}
} }
package com.zhiwei.middleware.automatic.server.pojo.enums;
public enum ManagerType {
SERVER,
SON;
}
package com.zhiwei.middleware.automatic.son.util; package com.zhiwei.middleware.automatic.server.util;
import org.redisson.api.*; import org.redisson.api.*;
import org.springframework.stereotype.Component;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Component
public class RedissonUtil { public class RedissonUtil {
private static final String MARK_KEY = "auto:mark:"; private static final String MARK_KEY = "auto:mark:";
private final RedissonClient redissonClient; private final RedissonClient redissonClient;
private RedissonUtil(RedissonClient redissonClient) { public RedissonUtil(RedissonClient redissonClient) {
this.redissonClient = redissonClient; this.redissonClient = redissonClient;
} }
/** /**
* redis队列添加数据
* @param key key
* @param value value
*/
public void putQueue(String key, String value) {
RQueue<String> queue = redissonClient.getQueue(redisKey(key));
queue.add(value);
}
/**
* 拉取redis队列数据 * 拉取redis队列数据
* @param key key * @param key key
* @param limit 条数 * @param limit 条数
...@@ -29,14 +37,34 @@ public class RedissonUtil { ...@@ -29,14 +37,34 @@ public class RedissonUtil {
return queue.poll(limit); return queue.poll(limit);
} }
public String getBucket(String runningKey) {
RBucket<String> bucket = redissonClient.getBucket(redisKey(runningKey));
return bucket.get();
}
public void setBucket(String runningKey, String value, Long timeOut, TimeUnit timeUnit) {
RBucket<String> bucket = redissonClient.getBucket(redisKey(runningKey));
bucket.set(value);
bucket.expire(timeOut, timeUnit);
}
public void deleteBucket(String runningKey) {
RBucket<String> bucket = redissonClient.getBucket(redisKey(runningKey));
bucket.delete();
}
public void putCount(String key, long count) { public void putCount(String key, long count) {
RAtomicLong atomicLong = redissonClient.getAtomicLong(redisKey(key)); RAtomicLong atomicLong = redissonClient.getAtomicLong(redisKey(key));
atomicLong.addAndGet(count); atomicLong.addAndGet(count);
atomicLong.expire(7, TimeUnit.DAYS); atomicLong.expire(7, TimeUnit.DAYS);
} }
public Long getCount(String key) {
return redissonClient.getAtomicLong(redisKey(key)).get();
}
public Map<String, String> getMapValue(String key) { public Map<String, String> getMapValue(String key) {
// return redissonClient.getMap(redisKey(key));
return redissonClient.getMap(key); return redissonClient.getMap(key);
} }
...@@ -55,6 +83,10 @@ public class RedissonUtil { ...@@ -55,6 +83,10 @@ public class RedissonUtil {
return redissonClient.getList(redisKey(redisKey)); return redissonClient.getList(redisKey(redisKey));
} }
public void setList(String key, List<String> value) {
RList<Object> list = redissonClient.getList(redisKey(key));
list.addAll(value);
}
public void deleteList(String key) { public void deleteList(String key) {
RList<Object> list = redissonClient.getList(redisKey(key)); RList<Object> list = redissonClient.getList(redisKey(key));
list.delete(); list.delete();
...@@ -63,5 +95,4 @@ public class RedissonUtil { ...@@ -63,5 +95,4 @@ public class RedissonUtil {
private String redisKey(String key) { private String redisKey(String key) {
return MARK_KEY + key; return MARK_KEY + key;
} }
} }
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
<qbjc-bean.version>1.1.4.1-SNAPSHOT</qbjc-bean.version> <qbjc-bean.version>1.1.4.1-SNAPSHOT</qbjc-bean.version>
<nlp-aggree.version>0.0.5-SNAPSHOT</nlp-aggree.version> <nlp-aggree.version>0.0.5-SNAPSHOT</nlp-aggree.version>
<dubbo-server.version>2.7.4.1</dubbo-server.version> <dubbo-server.version>2.7.4.1</dubbo-server.version>
<automatic.version>1.0.0.1-SNAPSHOT</automatic.version> <automatic.version>1.0.0.2-SNAPSHOT</automatic.version>
<base.version>2.0.0-SNAPSHOT</base.version> <base.version>2.0.0-SNAPSHOT</base.version>
</properties> </properties>
......
package com.zhiwei.middleware.automatic.server.config;
import com.zhiwei.middleware.automatic.server.util.RedissonUtil;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TaskManagerConfig {
@Autowired
private RedissonClient redissonClient;
@Bean
public RedissonUtil redissonUtil() {
return new RedissonUtil(redissonClient);
}
}
...@@ -3,13 +3,14 @@ package com.zhiwei.middleware.automatic.server.dubbo.service.impl; ...@@ -3,13 +3,14 @@ package com.zhiwei.middleware.automatic.server.dubbo.service.impl;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.entity.subclass.mark.MarkInfo; import com.zhiwei.base.entity.subclass.mark.MarkInfo;
import com.zhiwei.middleware.automatic.server.common.GenericAttribute; import com.zhiwei.middleware.automatic.server.common.GenericAttribute;
import com.zhiwei.middleware.automatic.server.core.TaskManager;
import com.zhiwei.middleware.automatic.server.dubbo.service.AutoMaticService; import com.zhiwei.middleware.automatic.server.dubbo.service.AutoMaticService;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask; import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.server.pojo.MarkInfoMulti; import com.zhiwei.middleware.automatic.server.pojo.MarkInfoMulti;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo; import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType; import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import com.zhiwei.middleware.automatic.server.redis.RedissonUtil;
import com.zhiwei.middleware.automatic.server.service.TemplateTitleService; import com.zhiwei.middleware.automatic.server.service.TemplateTitleService;
import com.zhiwei.middleware.automatic.server.util.RedissonUtil;
import com.zhiwei.middleware.automatic.server.util.Tools; import com.zhiwei.middleware.automatic.server.util.Tools;
import org.apache.dubbo.config.annotation.Service; import org.apache.dubbo.config.annotation.Service;
...@@ -32,39 +33,37 @@ public class AutoMaticServiceImpl implements AutoMaticService { ...@@ -32,39 +33,37 @@ public class AutoMaticServiceImpl implements AutoMaticService {
@Override @Override
public void autoMark(List<MarkInfo> markInfos) { public void autoMark(List<MarkInfo> markInfos) {
AutoTask autoTask = new AutoTask(TaskType.COMMON_ONE.getType()); AutoTask autoTask = new AutoTask(TaskType.COMMON_ONE.getType(), null);
String sourceKey = Tools.assembleKey(GenericAttribute.REDIS_QUEUE_ONE_KEY, Tools.randomUUID()); String sourceKey = Tools.assembleKey(GenericAttribute.REDIS_QUEUE_ONE_KEY, Tools.randomUUID());
redissonUtil.setList(sourceKey, markInfos.stream() redissonUtil.setList(sourceKey, markInfos.stream()
.filter(e -> Objects.nonNull(e) && Objects.nonNull(e.getSourceObj())) .filter(e -> Objects.nonNull(e) && Objects.nonNull(e.getSourceObj()))
.map(JSONObject::toJSONString).collect(Collectors.toList())); .map(JSONObject::toJSONString).collect(Collectors.toList()));
autoTask.getParamSource().put(TaskType.COMMON_ONE.getCacheId(), sourceKey); autoTask.getParamSource().put(TaskType.COMMON_ONE.getCacheId(), sourceKey);
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask)); TaskManager.getInstance().putTask(autoTask);
} }
@Override @Override
public void autoMarkMulti(List<MarkInfoMulti> markInfoMultis) { public void autoMarkMulti(List<MarkInfoMulti> markInfoMultis) {
AutoTask autoTask = new AutoTask(TaskType.COMMON_TWO.getType()); AutoTask autoTask = new AutoTask(TaskType.COMMON_TWO.getType(), null);
String sourceKey = Tools.assembleKey(GenericAttribute.REDIS_QUEUE_MULTI_KEY, Tools.randomUUID()); String sourceKey = Tools.assembleKey(GenericAttribute.REDIS_QUEUE_MULTI_KEY, Tools.randomUUID());
redissonUtil.setList(sourceKey, markInfoMultis.stream().map(JSONObject::toJSONString).collect(Collectors.toList())); redissonUtil.setList(sourceKey, markInfoMultis.stream().map(JSONObject::toJSONString).collect(Collectors.toList()));
autoTask.getParamSource().put(TaskType.COMMON_TWO.getCacheId(), sourceKey); autoTask.getParamSource().put(TaskType.COMMON_TWO.getCacheId(), sourceKey);
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask)); TaskManager.getInstance().putTask(autoTask);
} }
@Override @Override
public void modifyTemplateTitle(String group, String templateTitle, String fixTag) { public void modifyTemplateTitle(String group, String templateTitle, String fixTag) {
AutoTask autoTask = new AutoTask(TaskType.TEMPLATE_MODIFY.getType()); AutoTask autoTask = new AutoTask(TaskType.TEMPLATE_MODIFY.getType(), group);
autoTask.getParamSource().put(GenericAttribute.GROUP_PARAM, group);
autoTask.getParamSource().put(GenericAttribute.TEMPLATE_TITLE, templateTitle); autoTask.getParamSource().put(GenericAttribute.TEMPLATE_TITLE, templateTitle);
autoTask.getParamSource().put(GenericAttribute.FIX_TAG, fixTag); autoTask.getParamSource().put(GenericAttribute.FIX_TAG, fixTag);
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask)); TaskManager.getInstance().putTask(autoTask);
} }
@Override @Override
public void resetTemplate(String group, String templateTitle) { public void resetTemplate(String group, String templateTitle) {
AutoTask autoTask = new AutoTask(TaskType.TEMPLATE_RESET.getType()); AutoTask autoTask = new AutoTask(TaskType.TEMPLATE_RESET.getType(), group);
autoTask.getParamSource().put(GenericAttribute.GROUP_PARAM, group);
autoTask.getParamSource().put(GenericAttribute.TEMPLATE_TITLE, templateTitle); autoTask.getParamSource().put(GenericAttribute.TEMPLATE_TITLE, templateTitle);
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask)); TaskManager.getInstance().putTask(autoTask);
} }
@Override @Override
......
package com.zhiwei.middleware.automatic.server.mission; package com.zhiwei.middleware.automatic.server.mission;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.middleware.automatic.server.common.GenericAttribute; import com.zhiwei.middleware.automatic.server.common.GenericAttribute;
import com.zhiwei.middleware.automatic.server.core.TaskManager;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask; import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo; import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType; import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus; import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus;
import com.zhiwei.middleware.automatic.server.redis.RedissonUtil;
import com.zhiwei.middleware.automatic.server.service.TemplateTitleService; import com.zhiwei.middleware.automatic.server.service.TemplateTitleService;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
...@@ -26,22 +25,18 @@ public class ScheduledMission { ...@@ -26,22 +25,18 @@ public class ScheduledMission {
private static final Long MONTH = 1000L * 60 * 60 * 24 * 30; private static final Long MONTH = 1000L * 60 * 60 * 24 * 30;
private final RedissonUtil redissonUtil;
private final AsyncTask asyncTask; private final AsyncTask asyncTask;
private final TemplateTitleService templateTitleService; private final TemplateTitleService templateTitleService;
public ScheduledMission(RedissonUtil redissonUtil, AsyncTask asyncTask, public ScheduledMission(AsyncTask asyncTask,
TemplateTitleService templateTitleService) { TemplateTitleService templateTitleService) {
this.redissonUtil = redissonUtil;
this.asyncTask = asyncTask; this.asyncTask = asyncTask;
this.templateTitleService = templateTitleService; this.templateTitleService = templateTitleService;
} }
@Scheduled(cron = "0 0/5 * * * ?")
// @Scheduled(cron = "0 0/5 * * * ?") @Async("asyncExecutor")
// @Async("asyncExecutor")
public void templateHourSync() { public void templateHourSync() {
try { try {
Calendar calendar = Calendar.getInstance(); Calendar calendar = Calendar.getInstance();
...@@ -51,15 +46,15 @@ public class ScheduledMission { ...@@ -51,15 +46,15 @@ public class ScheduledMission {
calendarEndTime.add(Calendar.MINUTE, -5); calendarEndTime.add(Calendar.MINUTE, -5);
long endTime = calendarEndTime.getTime().getTime(); long endTime = calendarEndTime.getTime().getTime();
for (String project : asyncTask.findAllGroup()) { for (String project : asyncTask.findAllGroup()) {
putTask(project, startTime, endTime); putTask(project, startTime, endTime, true);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("十分钟定时同步模板失败:", e); log.error("十分钟定时同步模板失败:", e);
} }
} }
// @Scheduled(cron = "0 10 4 * * ?") @Scheduled(cron = "0 10 4 * * ?")
// @Async("autMarkExecutor") @Async("autMarkExecutor")
public void templateDaySync() { public void templateDaySync() {
try { try {
Calendar calendar = Calendar.getInstance(); Calendar calendar = Calendar.getInstance();
...@@ -69,7 +64,8 @@ public class ScheduledMission { ...@@ -69,7 +64,8 @@ public class ScheduledMission {
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
for (String project : asyncTask.findAllGroup()) { for (String project : asyncTask.findAllGroup()) {
//模板聚合任务 //模板聚合任务
putTask(project, startTime, endTime); putTask(project, startTime, endTime, false);
putTaskBy(project);
} }
// 模板记录清除任务 // 模板记录清除任务
...@@ -79,28 +75,26 @@ public class ScheduledMission { ...@@ -79,28 +75,26 @@ public class ScheduledMission {
} }
} }
private void putTask(String group, long startTime, long endTime) { private void putTask(String group, long startTime, long endTime, boolean splitFilter) {
AutoTask autoTask = new AutoTask(TaskType.TEMPLATE.getType()); AutoTask autoTask = new AutoTask(TaskType.TEMPLATE.getType(), group, splitFilter);
autoTask.getParamSource().put(GenericAttribute.GROUP_PARAM, group);
autoTask.getParamSource().put(GenericAttribute.START_PARAM, startTime); autoTask.getParamSource().put(GenericAttribute.START_PARAM, startTime);
autoTask.getParamSource().put(GenericAttribute.END_PARAM, endTime); autoTask.getParamSource().put(GenericAttribute.END_PARAM, endTime);
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask)); TaskManager.getInstance().putTask(autoTask);
} }
private void putTaskByRecord() { private void putTaskByRecord() {
AutoTask autoTask = new AutoTask(TaskType.TEMPLATE_RECORD.getType()); AutoTask autoTask = new AutoTask(TaskType.TEMPLATE_RECORD.getType(), null);
autoTask.getParamSource().put(GenericAttribute.END_PARAM, System.currentTimeMillis() - MONTH); autoTask.getParamSource().put(GenericAttribute.END_PARAM, System.currentTimeMillis() - MONTH);
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask)); TaskManager.getInstance().putTask(autoTask);
} }
private void putTaskBy(String group) { private void putTaskBy(String group) {
Map<String, TemplateTitleVo> project = templateTitleService.getTemplateTitleByProject(group); Map<String, TemplateTitleVo> project = templateTitleService.getTemplateTitleByProject(group);
for (Map.Entry<String, TemplateTitleVo> entry : project.entrySet()) { for (Map.Entry<String, TemplateTitleVo> entry : project.entrySet()) {
if (entry.getValue().getStatus() == TemplateStatus.重置失败) { if (entry.getValue().getStatus() == TemplateStatus.重置失败) {
AutoTask autoTask = new AutoTask(TaskType.TEMPLATE_CLEAR_RETRY.getType()); AutoTask autoTask = new AutoTask(TaskType.TEMPLATE_CLEAR_RETRY.getType(), group);
autoTask.getParamSource().put(GenericAttribute.GROUP_PARAM, group);
autoTask.getParamSource().put(GenericAttribute.TEMPLATE_TITLE, entry.getKey()); autoTask.getParamSource().put(GenericAttribute.TEMPLATE_TITLE, entry.getKey());
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask)); TaskManager.getInstance().putTask(autoTask);
} }
} }
} }
......
package com.zhiwei.middleware.automatic.server.mission;
import com.zhiwei.middleware.automatic.server.core.TaskManager;
import com.zhiwei.middleware.automatic.server.pojo.enums.ManagerType;
import com.zhiwei.middleware.automatic.server.util.RedissonUtil;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/**
* 任务初始化
*/
@Component
public class TaskInit implements ApplicationRunner {
private final RedissonUtil redissonUtil;
public TaskInit(RedissonUtil redissonUtil) {
this.redissonUtil = redissonUtil;
}
@Override
public void run(ApplicationArguments args) {
TaskManager.initManager(ManagerType.SERVER, redissonUtil, null, 0, null);
}
}
package com.zhiwei.middleware.automatic.server.redis;
import org.redisson.api.*;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
@Component
public class RedissonUtil {
private static final String MARK_KEY = "auto:mark:";
private final RedissonClient redissonClient;
private RedissonUtil(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}
/**
* redis队列添加数据
* @param key key
* @param value value
*/
public void putQueue(String key, String value) {
RQueue<String> queue = redissonClient.getQueue(redisKey(key));
queue.add(value);
}
public void setList(String key, List<String> value) {
RList<Object> list = redissonClient.getList(redisKey(key));
list.addAll(value);
}
public Map<String, String> getMapValue(String key) {
return redissonClient.getMap(redisKey(key));
}
public String getMapKeyValue(String key, String group) {
RMap<String, String> map = redissonClient.getMap(redisKey(key));
return map.get(group);
}
public void setMapValue(String key, String group, String value) {
RMap<String, String> map = redissonClient.getMap(redisKey(key));
map.put(group, value);
}
private String redisKey(String key) {
return MARK_KEY + key;
}
public Long getCount(String key) {
return redissonClient.getAtomicLong(redisKey(key)).get();
}
}
...@@ -6,8 +6,8 @@ import com.zhiwei.middleware.automatic.server.service.TemplateTitleService; ...@@ -6,8 +6,8 @@ import com.zhiwei.middleware.automatic.server.service.TemplateTitleService;
import com.zhiwei.middleware.automatic.server.pojo.TemplateRecord; import com.zhiwei.middleware.automatic.server.pojo.TemplateRecord;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo; import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus; import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus;
import com.zhiwei.middleware.automatic.server.redis.RedissonUtil;
import com.zhiwei.middleware.automatic.server.util.CosineSimilarity; import com.zhiwei.middleware.automatic.server.util.CosineSimilarity;
import com.zhiwei.middleware.automatic.server.util.RedissonUtil;
import com.zhiwei.middleware.automatic.server.util.Tools; import com.zhiwei.middleware.automatic.server.util.Tools;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
<qbjc-bean.version>1.1.4.1-SNAPSHOT</qbjc-bean.version> <qbjc-bean.version>1.1.4.1-SNAPSHOT</qbjc-bean.version>
<kafka.version>2.4.1.RELEASE</kafka.version> <kafka.version>2.4.1.RELEASE</kafka.version>
<base.version>2.0.0-SNAPSHOT</base.version> <base.version>2.0.0-SNAPSHOT</base.version>
<automatic.version>1.0-SNAPSHOT</automatic.version> <automatic.version>1.0.0.2-SNAPSHOT</automatic.version>
<marker.version>1.2.3-SNAPSHOT</marker.version> <marker.version>1.2.3-SNAPSHOT</marker.version>
<filter.version>1.1.6-SNAPSHOT</filter.version> <filter.version>1.1.6-SNAPSHOT</filter.version>
<nlp-aggree.version>0.0.5-SNAPSHOT</nlp-aggree.version> <nlp-aggree.version>0.0.5-SNAPSHOT</nlp-aggree.version>
...@@ -35,6 +35,28 @@ ...@@ -35,6 +35,28 @@
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>${spring-boot.version}</version>
<exclusions><!-- 去掉默认配置 -->
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId> <groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId> <artifactId>dubbo</artifactId>
<version>${dubbo-server.version}</version> <version>${dubbo-server.version}</version>
...@@ -259,6 +281,12 @@ ...@@ -259,6 +281,12 @@
<version>${json.version}</version> <version>${json.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
package com.zhiwei.middleware.automatic.son.config;
import com.zhiwei.middleware.automatic.server.util.RedissonUtil;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TaskManagerConfig {
@Autowired
private RedissonClient redissonClient;
@Bean
public RedissonUtil redissonUtil() {
return new RedissonUtil(redissonClient);
}
}
package com.zhiwei.middleware.automatic.son.config; package com.zhiwei.middleware.automatic.son.config;
import com.zhiwei.middleware.automatic.son.mission.ThreadPoolExecutorTimeout;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component @Component
public class TaskPoolConfig { public class TaskPoolConfig {
...@@ -28,20 +30,8 @@ public class TaskPoolConfig { ...@@ -28,20 +30,8 @@ public class TaskPoolConfig {
} }
@Bean("templateExecutor") @Bean("templateExecutor")
public ThreadPoolTaskExecutor templateExecutor() { public ThreadPoolExecutorTimeout templateExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); return new ThreadPoolExecutorTimeout(1000L * 60 * 30, 10, 20,10000L,
// 配置核心线程数 TimeUnit.MILLISECONDS, 10, "模板任务线程池");
executor.setCorePoolSize(8);
// 配置最大线程数
executor.setMaxPoolSize(15);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("template-executor-");
executor.setQueueCapacity(20);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
} }
} }
...@@ -21,6 +21,8 @@ public class TemplateTempRecord { ...@@ -21,6 +21,8 @@ public class TemplateTempRecord {
private Long createAt; private Long createAt;
private String type;
public TemplateTempRecord() {} public TemplateTempRecord() {}
public TemplateTempRecord(String id, String templateId, String templateTitle, public TemplateTempRecord(String id, String templateId, String templateTitle,
...@@ -73,4 +75,16 @@ public class TemplateTempRecord { ...@@ -73,4 +75,16 @@ public class TemplateTempRecord {
public Long getCreateAt() { public Long getCreateAt() {
return createAt; return createAt;
} }
public String getEsId() {
return esId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
} }
package com.zhiwei.middleware.automatic.son.mission;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class SupervisedThread implements Delayed {
//线程
private final Runnable runnable;
private final Thread thread;
//开始时间
private final Long startTime;
private final Long timeout;
public SupervisedThread(Runnable runnable, Thread thread, long startTime, Long timeout) {
this.runnable = runnable;
this.thread = thread;
this.startTime = startTime;
this.timeout = timeout;
}
public Thread getThread() {
return thread;
}
public long getStartTime() {
return startTime;
}
public Runnable getRunnable() {
return runnable;
}
@Override
public long getDelay(TimeUnit unit) {
// TimeUnit.NANOSECONDS 延迟队列中用的就是MILLISECONDS维度
return TimeUnit.MILLISECONDS.toNanos((timeout - (System.currentTimeMillis() - startTime)));
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.NANOSECONDS), o.getDelay(TimeUnit.NANOSECONDS));
}
}
package com.zhiwei.middleware.automatic.son.mission;
import com.zhiwei.middleware.automatic.server.common.GenericAttribute;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.son.util.Tools;
public class TemplateRunnable implements Runnable {
private final Runnable runnable;
private final Long startTime;
private final Long endTime;
private final String group;
public TemplateRunnable(Runnable runnable, AutoTask autoTask) {
this.runnable = runnable;
this.startTime = autoTask.getParamSource().getLong(GenericAttribute.START_PARAM);
this.endTime = autoTask.getParamSource().getLong(GenericAttribute.END_PARAM);
this.group = autoTask.getGroup();
}
public String getContent() {
return "项目:" +
group + ", 开始时间:" +
Tools.TIME_FORMAT.format(startTime) +
", 结束时间:" + endTime;
}
public Runnable getRunnable() {
return runnable;
}
@Override
public void run() {
runnable.run();
}
}
package com.zhiwei.middleware.automatic.son.mission;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.*;
public class ThreadPoolExecutorTimeout extends ThreadPoolExecutor {
private static final Logger log = LogManager.getLogger(ThreadPoolExecutorTimeout.class);
/**
* 延迟队列,实现任务超时
*/
private final DelayQueue<SupervisedThread> supervisedThreadsQueue = new DelayQueue<>();
/**
* 工作空间
*/
private final ConcurrentHashMap<Runnable, SupervisedThread> running = new ConcurrentHashMap<>();
private final Long timeout;
/**
* 构造方法
* @param timeout 超时时间
* @param corePoolSize 核心线程
* @param maximumPoolSize 最大线程
* @param keepAliveTime 活动时间
* @param unit 时间类
* @param workQueueSize 队列
*/
public ThreadPoolExecutorTimeout(Long timeout, int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, int workQueueSize, String executorName) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new ArrayBlockingQueue<>(workQueueSize), r -> {
Thread t = Executors.defaultThreadFactory().newThread(r);
t.setName("timeout-" + executorName);
return t;
});
this.timeout = timeout;
}
/**
* 重写线程执行前方法:添加工作线程,并监听
* @param t
* @param r
*/
@Override
protected void beforeExecute(Thread t, Runnable r) {
//开始监听
SupervisedThread supervisedThread = new SupervisedThread(r, t, System.currentTimeMillis(), timeout);
supervisedThreadsQueue.put(supervisedThread);
//提交线程池
running.put(r, supervisedThread);
super.beforeExecute(t, r);
}
/**
* 重写线程执行后方法:从set中删除工作线程
* {@inheritDoc}
*/
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
//线程正常结束:删除队列中的监听
SupervisedThread supervisedThread = running.get(r);
boolean remove = supervisedThreadsQueue.remove(supervisedThread);
if (remove) {
running.remove(r);
}
}
/**
* 监听线程
*/
private class SupervisorRunnable implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
//从延时队列中,获取监督线程
SupervisedThread thread = supervisedThreadsQueue.poll(timeout, TimeUnit.MILLISECONDS);
if (thread != null) {
Runnable runnable = thread.getRunnable();
// 线程中断
thread.getThread().interrupt();
running.remove(runnable);
if (runnable instanceof TemplateRunnable) {
TemplateRunnable templateRunnable = (TemplateRunnable) runnable;
log.error("任务超时,打断线程,任务信息:{},", templateRunnable.getContent());
} else {
log.error("打断超时线程:{}", thread.getThread().getName());
}
}
//当前线程已经被中断,并且正在运行的线程为空
if (running.isEmpty() && supervisedThreadsQueue.isEmpty()) {
log.info("任务处理完毕进入休眠状态:当前正在运行的线程数量: {}", running.size());
Thread.sleep(1000L);
}
} catch (InterruptedException e) {
log.info("主线程收到中断信号");
}
}
}
}
}
...@@ -8,12 +8,13 @@ import com.zhiwei.middleware.automatic.server.common.GenericAttribute; ...@@ -8,12 +8,13 @@ import com.zhiwei.middleware.automatic.server.common.GenericAttribute;
import com.zhiwei.middleware.automatic.server.pojo.TemplateRecord; import com.zhiwei.middleware.automatic.server.pojo.TemplateRecord;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo; import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus; import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus;
import com.zhiwei.middleware.automatic.server.util.RedissonUtil;
import com.zhiwei.middleware.automatic.son.dao.EsDao; import com.zhiwei.middleware.automatic.son.dao.EsDao;
import com.zhiwei.middleware.automatic.son.dao.TemplateRecordDao; import com.zhiwei.middleware.automatic.son.dao.TemplateRecordDao;
import com.zhiwei.middleware.automatic.son.dubbo.DubboHandler; import com.zhiwei.middleware.automatic.son.dubbo.DubboHandler;
import com.zhiwei.middleware.automatic.son.mission.ThreadPoolExecutorTimeout;
import com.zhiwei.middleware.automatic.son.service.TemplateTitleService; import com.zhiwei.middleware.automatic.son.service.TemplateTitleService;
import com.zhiwei.middleware.automatic.son.util.MarkInfoUtil; import com.zhiwei.middleware.automatic.son.util.MarkInfoUtil;
import com.zhiwei.middleware.automatic.son.util.RedissonUtil;
import com.zhiwei.middleware.automatic.son.util.Tools; import com.zhiwei.middleware.automatic.son.util.Tools;
import org.apache.commons.lang3.time.FastDateFormat; import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
...@@ -45,15 +46,15 @@ public class TemplateTitleServiceImpl implements TemplateTitleService { ...@@ -45,15 +46,15 @@ public class TemplateTitleServiceImpl implements TemplateTitleService {
private final DubboHandler dubboHandler; private final DubboHandler dubboHandler;
private final ThreadPoolTaskExecutor executor; private final ThreadPoolExecutorTimeout executor;
private static final FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd"); private static final FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd");
private static final String AGGREGATION_FILE = "aggregation-"; private static final String AGGREGATION_FILE = "aggregation-";
public TemplateTitleServiceImpl(RedissonUtil redissonUtil, EsDao esDao, public TemplateTitleServiceImpl(RedissonUtil redissonUtil, EsDao esDao,
IndexUtil.ESIndexes esIndexes, TemplateRecordDao templateRecordDao, IndexUtil.ESIndexes esIndexes, TemplateRecordDao templateRecordDao,
DubboHandler dubboHandler, DubboHandler dubboHandler,
@Qualifier("templateExecutor") ThreadPoolTaskExecutor executor) { @Qualifier("templateExecutor") ThreadPoolExecutorTimeout executor) {
this.redissonUtil = redissonUtil; this.redissonUtil = redissonUtil;
this.esDao = esDao; this.esDao = esDao;
this.esIndexes = esIndexes; this.esIndexes = esIndexes;
...@@ -65,8 +66,8 @@ public class TemplateTitleServiceImpl implements TemplateTitleService { ...@@ -65,8 +66,8 @@ public class TemplateTitleServiceImpl implements TemplateTitleService {
@Override @Override
public Map<String, TemplateTitleVo> getTemplateTitleByProject(String project) { public Map<String, TemplateTitleVo> getTemplateTitleByProject(String project) {
// Map<String, String> mapValue = redissonUtil.getMapValue(Tools.assembleKey(GenericAttribute.REDIS_MAP_KEY, project)); Map<String, String> mapValue = redissonUtil.getMapValue(Tools.assembleKey(GenericAttribute.REDIS_MAP_KEY, project));
Map<String, String> mapValue = redissonUtil.getMapValue(Tools.assembleKey(getFileDayName(project))); // Map<String, String> mapValue = redissonUtil.getMapValue(Tools.assembleKey(getFileDayName(project)));
if (Tools.isEmpty(mapValue)) { if (Tools.isEmpty(mapValue)) {
return new HashMap<>(); return new HashMap<>();
} }
......
package com.zhiwei.middleware.automatic.son.task;
import com.zhiwei.middleware.automatic.server.core.TaskManager;
import com.zhiwei.middleware.automatic.server.pojo.enums.ManagerType;
import com.zhiwei.middleware.automatic.server.util.RedissonUtil;
import com.zhiwei.middleware.automatic.son.task.holder.TaskServiceHandler;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.redisson.api.RedissonClient;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 任务初始化
*/
@Component
public class TaskInit implements ApplicationRunner {
private final RedissonUtil redissonUtil;
public TaskInit(RedissonUtil redissonUtil) {
this.redissonUtil = redissonUtil;
}
@Override
public void run(ApplicationArguments args) {
TaskManager.initManager(ManagerType.SON, redissonUtil, TaskServiceHandler.getInstance()::taskExecute, 5, TimeUnit.SECONDS);
}
}
package com.zhiwei.middleware.automatic.son.task;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.zhiwei.middleware.automatic.server.common.GenericAttribute;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import com.zhiwei.middleware.automatic.son.task.holder.TaskServiceHandler;
import com.zhiwei.middleware.automatic.son.task.service.TaskService;
import com.zhiwei.middleware.automatic.son.util.RedissonUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 任务管理器
*/
@Component
public class TaskManager implements ApplicationRunner {
private static final Logger log = LogManager.getLogger(TaskManager.class);
/** 定时处理线程 **/
private static final ScheduledExecutorService TASK_EXECUTOR = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("task-manager").build());
private final RedissonUtil redissonUtil;
private static final int LIMIT = 5;
public TaskManager(RedissonUtil redissonUtil) {
this.redissonUtil = redissonUtil;
}
@Override
public void run(ApplicationArguments args) {
TASK_EXECUTOR.scheduleWithFixedDelay(this::pullTask, 10L, 10L, TimeUnit.SECONDS);
log.info("定时线程构建完毕");
}
public void pullTask() {
try {
for (String s : redissonUtil.pullQueue(GenericAttribute.KEY, LIMIT)) {
AutoTask autoTask = JSONObject.parseObject(s).toJavaObject(AutoTask.class);
String cacheId = TaskServiceHandler.getInstance().taskExecute(autoTask);
if (!Strings.isEmpty(cacheId)) {
redissonUtil.deleteList(autoTask.getParamSource().getString(cacheId));
}
}
} catch (Exception e) {
log.error("任务管理器,任务执行失败:", e);
}
}
}
...@@ -4,12 +4,14 @@ import com.alibaba.fastjson.JSONObject; ...@@ -4,12 +4,14 @@ import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.category.ClassB; import com.zhiwei.base.category.ClassB;
import com.zhiwei.base.entity.subclass.mark.*; import com.zhiwei.base.entity.subclass.mark.*;
import com.zhiwei.middleware.automatic.server.common.GenericAttribute; import com.zhiwei.middleware.automatic.server.common.GenericAttribute;
import com.zhiwei.middleware.automatic.server.core.TaskManager;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask; import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.server.pojo.MarkInfoMulti; import com.zhiwei.middleware.automatic.server.pojo.MarkInfoMulti;
import com.zhiwei.middleware.automatic.server.pojo.TemplateRecord; import com.zhiwei.middleware.automatic.server.pojo.TemplateRecord;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo; import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType; import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus; import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus;
import com.zhiwei.middleware.automatic.server.util.RedissonUtil;
import com.zhiwei.middleware.automatic.son.dao.TemplateRecordDao; import com.zhiwei.middleware.automatic.son.dao.TemplateRecordDao;
import com.zhiwei.middleware.automatic.son.dao.TemplateTempRecord; import com.zhiwei.middleware.automatic.son.dao.TemplateTempRecord;
import com.zhiwei.middleware.automatic.son.dubbo.DubboHandler; import com.zhiwei.middleware.automatic.son.dubbo.DubboHandler;
...@@ -17,7 +19,6 @@ import com.zhiwei.middleware.automatic.son.service.TemplateTitleService; ...@@ -17,7 +19,6 @@ import com.zhiwei.middleware.automatic.son.service.TemplateTitleService;
import com.zhiwei.middleware.automatic.son.task.BaseTaskTypePair; import com.zhiwei.middleware.automatic.son.task.BaseTaskTypePair;
import com.zhiwei.middleware.automatic.son.util.CosineSimilarity; import com.zhiwei.middleware.automatic.son.util.CosineSimilarity;
import com.zhiwei.middleware.automatic.son.util.MarkInfoUtil; import com.zhiwei.middleware.automatic.son.util.MarkInfoUtil;
import com.zhiwei.middleware.automatic.son.util.RedissonUtil;
import com.zhiwei.middleware.automatic.son.util.Tools; import com.zhiwei.middleware.automatic.son.util.Tools;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
...@@ -71,7 +72,10 @@ public class TaskServiceCommon extends BaseTaskTypePair<TaskServiceCommon.TaskCo ...@@ -71,7 +72,10 @@ public class TaskServiceCommon extends BaseTaskTypePair<TaskServiceCommon.TaskCo
if (dataSourceInfo.isEmpty()) { if (dataSourceInfo.isEmpty()) {
log.error("本地任务可用数据为空,taskType:{}", autoTask.getType()); log.error("本地任务可用数据为空,taskType:{}", autoTask.getType());
} }
autoMarkExecutor.execute(() -> projectAutoMark(dataSourceInfo)); autoMarkExecutor.execute(() -> {
log.info("自动标注任务");
projectAutoMark(dataSourceInfo);
});
} }
@Override @Override
...@@ -173,6 +177,7 @@ public class TaskServiceCommon extends BaseTaskTypePair<TaskServiceCommon.TaskCo ...@@ -173,6 +177,7 @@ public class TaskServiceCommon extends BaseTaskTypePair<TaskServiceCommon.TaskCo
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
// 获取返回值 // 获取返回值
allOf.thenApply(e -> futures.stream().map(CompletableFuture::join)).get(); allOf.thenApply(e -> futures.stream().map(CompletableFuture::join)).get();
} }
/** /**
...@@ -184,7 +189,7 @@ public class TaskServiceCommon extends BaseTaskTypePair<TaskServiceCommon.TaskCo ...@@ -184,7 +189,7 @@ public class TaskServiceCommon extends BaseTaskTypePair<TaskServiceCommon.TaskCo
private void oneTitleMark(String group, List<MarkInfo> markInfos, Map<String, TemplateTitleVo> titleVoMap) { private void oneTitleMark(String group, List<MarkInfo> markInfos, Map<String, TemplateTitleVo> titleVoMap) {
// 移除标题长度小于6的部分 // 移除标题长度小于6的部分
List<MarkInfo> newList = markInfos.stream().filter(markInfo -> { List<MarkInfo> newList = markInfos.stream().filter(markInfo -> {
String title = markInfo.getSourceObj().getString(GenericAttribute.ES_TITLE); String title = Tools.filterSymbol(markInfo.getSourceObj().getString(GenericAttribute.ES_TITLE));
return null != title && title.length() > 6; return null != title && title.length() > 6;
}).filter(e -> autoMark(group, e, titleVoMap)).collect(Collectors.toList()); }).filter(e -> autoMark(group, e, titleVoMap)).collect(Collectors.toList());
// dubboHandler.markUpsert(newList); // dubboHandler.markUpsert(newList);
......
...@@ -11,6 +11,8 @@ import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType; ...@@ -11,6 +11,8 @@ import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus; import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus;
import com.zhiwei.middleware.automatic.son.dao.EsDao; import com.zhiwei.middleware.automatic.son.dao.EsDao;
import com.zhiwei.middleware.automatic.son.dao.TemplateRecordDao; import com.zhiwei.middleware.automatic.son.dao.TemplateRecordDao;
import com.zhiwei.middleware.automatic.son.mission.TemplateRunnable;
import com.zhiwei.middleware.automatic.son.mission.ThreadPoolExecutorTimeout;
import com.zhiwei.middleware.automatic.son.service.TemplateTitleService; import com.zhiwei.middleware.automatic.son.service.TemplateTitleService;
import com.zhiwei.middleware.automatic.son.task.BaseTaskTypePair; import com.zhiwei.middleware.automatic.son.task.BaseTaskTypePair;
import com.zhiwei.middleware.automatic.son.util.CosineSimilarity; import com.zhiwei.middleware.automatic.son.util.CosineSimilarity;
...@@ -36,7 +38,7 @@ import java.util.*; ...@@ -36,7 +38,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
//@Service @Service
public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.TaskTemplateFunctional> implements TaskService { public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.TaskTemplateFunctional> implements TaskService {
private final Logger log = LogManager.getLogger(TaskServiceTemplate.class); private final Logger log = LogManager.getLogger(TaskServiceTemplate.class);
...@@ -49,7 +51,7 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta ...@@ -49,7 +51,7 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta
private final TemplateRecordDao templateRecordDao; private final TemplateRecordDao templateRecordDao;
private final ThreadPoolTaskExecutor executor; private final ThreadPoolExecutorTimeout executor;
/* 一天的秒数(为保留前一天文件) */ /* 一天的秒数(为保留前一天文件) */
private static final int ONE_DAY = 60 * 60 * 24; private static final int ONE_DAY = 60 * 60 * 24;
...@@ -57,13 +59,13 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta ...@@ -57,13 +59,13 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta
public TaskServiceTemplate(TemplateTitleService templateTitleService, public TaskServiceTemplate(TemplateTitleService templateTitleService,
EsDao esDao, IndexUtil.ESIndexes esIndexes, EsDao esDao, IndexUtil.ESIndexes esIndexes,
TemplateRecordDao templateRecordDao, TemplateRecordDao templateRecordDao,
@Qualifier("templateExecutor") ThreadPoolTaskExecutor executor) { @Qualifier("templateExecutor") ThreadPoolExecutorTimeout executor) {
this.templateTitleService = templateTitleService; this.templateTitleService = templateTitleService;
this.esDao = esDao; this.esDao = esDao;
this.esIndexes = esIndexes; this.esIndexes = esIndexes;
this.templateRecordDao = templateRecordDao; this.templateRecordDao = templateRecordDao;
this.executor = executor; this.executor = executor;
taskCache(TaskType.TEMPLATE, this::runTask); taskCache(TaskType.TEMPLATE, this::templateRun);
taskCache(TaskType.TEMPLATE_MODIFY, this::templateModify); taskCache(TaskType.TEMPLATE_MODIFY, this::templateModify);
taskCache(TaskType.TEMPLATE_RESET, this::templateReset); taskCache(TaskType.TEMPLATE_RESET, this::templateReset);
taskCache(TaskType.TEMPLATE_CLEAR_RETRY, this::templateReset); taskCache(TaskType.TEMPLATE_CLEAR_RETRY, this::templateReset);
...@@ -77,10 +79,14 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta ...@@ -77,10 +79,14 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta
@Override @Override
public void runTask(AutoTask autoTask) { public void runTask(AutoTask autoTask) {
executor.execute(() -> { executor.execute(new TemplateRunnable(() -> {
long now = System.currentTimeMillis();
TaskTemplateFunctional pair = getPairValue(autoTask.getType()); TaskTemplateFunctional pair = getPairValue(autoTask.getType());
pair.template(autoTask.getParamSource()); pair.template(autoTask);
}); log.info("模板任务结束, 项目:{},开始时间:{},结束时间:{},耗时:{}", autoTask.getGroup(),
Tools.TIME_FORMAT.format(autoTask.getParamSource().getLong(GenericAttribute.START_PARAM)),
Tools.TIME_FORMAT.format(autoTask.getParamSource().getLong(GenericAttribute.START_PARAM)), System.currentTimeMillis() - now);
}, autoTask));
} }
@Override @Override
...@@ -89,34 +95,36 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta ...@@ -89,34 +95,36 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta
return false; return false;
} }
private void templateRecordClean(JSONObject json) { private void templateRecordClean(AutoTask autoTask) {
templateRecordDao.removeTemplateRecord(new Query(Criteria.where("createAt").lt(json.getLong(GenericAttribute.END_PARAM)))); templateRecordDao.removeTemplateRecord(new Query(Criteria.where("createAt").lt(autoTask.getParamSource().getLong(GenericAttribute.END_PARAM))));
} }
/** /**
* 模板修改任务 * 模板修改任务
* @param json 参数 * @param autoTask 参数
*/ */
private void templateModify(JSONObject json) { private void templateModify(AutoTask autoTask) {
templateTitleService.modifyTemplateTitle(json.getString(GenericAttribute.GROUP_PARAM), json.getString(GenericAttribute.TEMPLATE_TITLE), JSONObject json = autoTask.getParamSource();
templateTitleService.modifyTemplateTitle(autoTask.getGroup(), json.getString(GenericAttribute.TEMPLATE_TITLE),
json.getString(GenericAttribute.FIX_TAG)); json.getString(GenericAttribute.FIX_TAG));
} }
/** /**
* 模板重置任务 * 模板重置任务
* @param json 参数 * @param autoTask 参数
*/ */
private void templateReset(JSONObject json) { private void templateReset(AutoTask autoTask) {
templateTitleService.resetTemplate(json.getString(GenericAttribute.GROUP_PARAM), json.getString(GenericAttribute.TEMPLATE_TITLE)); templateTitleService.resetTemplate(autoTask.getGroup(), autoTask.getParamSource().getString(GenericAttribute.TEMPLATE_TITLE));
} }
/** /**
* 模板构建任务 * 模板构建任务
* @param json 参数 * @param autoTask 参数
*/ */
private void runTask(JSONObject json) { private void templateRun(AutoTask autoTask) {
String group = json.getString(GenericAttribute.GROUP_PARAM); JSONObject json = autoTask.getParamSource();
String group = autoTask.getGroup();
try { try {
Long startTime = json.getLong(GenericAttribute.START_PARAM); Long startTime = json.getLong(GenericAttribute.START_PARAM);
Long endTime = json.getLong(GenericAttribute.END_PARAM); Long endTime = json.getLong(GenericAttribute.END_PARAM);
...@@ -259,6 +267,6 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta ...@@ -259,6 +267,6 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta
@FunctionalInterface @FunctionalInterface
public interface TaskTemplateFunctional { public interface TaskTemplateFunctional {
void template(JSONObject json); void template(AutoTask autoTask);
} }
} }
package com.zhiwei.middleware.automatic.son.util; package com.zhiwei.middleware.automatic.son.util;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils; import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import java.util.*; import java.util.*;
import java.util.regex.Pattern; import java.util.regex.Pattern;
...@@ -12,7 +14,7 @@ public class Tools { ...@@ -12,7 +14,7 @@ public class Tools {
private static final Pattern SYMBOL_PATTERN = Pattern private static final Pattern SYMBOL_PATTERN = Pattern
.compile("[\\p{P}+~$`^=丨|<>~`$^+=|<>¥×\\s\u200B\u200C\u200D\u00A0\u0020\u3000]"); .compile("[\\p{P}+~$`^=丨|<>~`$^+=|<>¥×\\s\u200B\u200C\u200D\u00A0\u0020\u3000]");
public static final FastDateFormat TIME_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
/** /**
* 是否为空,数据为空 * 是否为空,数据为空
* *
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment