Commit dabaa69a by liuyu

2023年04月26 任务枚举优化

parent b52d3566
...@@ -44,4 +44,6 @@ public class GenericAttribute { ...@@ -44,4 +44,6 @@ public class GenericAttribute {
public static final String FIX_TAG = "fixTag"; public static final String FIX_TAG = "fixTag";
public static final String KEY = "task"; public static final String KEY = "task";
public static final String URL = "url";
} }
...@@ -3,9 +3,9 @@ package com.zhiwei.middleware.automatic.server.pojo.enums; ...@@ -3,9 +3,9 @@ package com.zhiwei.middleware.automatic.server.pojo.enums;
public enum TaskType { public enum TaskType {
COMMON_ONE("common_one","common", "commonCache"), COMMON_ONE("common_one","common", "commonCache"),
COMMON_TWO("common_two","common", "commonCache"), COMMON_TWO("common_two","common", "commonCache"),
TEMPLATE("template", "template", ""), TEMPLATE("template", "template", null),
TEMPLATE_MODIFY("template_modify","template", ""), TEMPLATE_MODIFY("template_modify","template", null),
TEMPLATE_RESET("template_reset","template", ""); TEMPLATE_RESET("template_reset","template", null);
final String type; final String type;
final String name; final String name;
......
...@@ -49,23 +49,13 @@ public class TaskManager implements ApplicationRunner { ...@@ -49,23 +49,13 @@ public class TaskManager implements ApplicationRunner {
public void pullTask() { public void pullTask() {
try { try {
List<AutoTask> tasks = redissonUtil.pullQueue(GenericAttribute.KEY, LIMIT) for (String s : redissonUtil.pullQueue(GenericAttribute.KEY, LIMIT)) {
.stream() AutoTask autoTask = JSONObject.parseObject(s).toJavaObject(AutoTask.class);
.map(e -> JSONObject.parseObject(e).toJavaObject(AutoTask.class)) String cacheId = TaskServiceHandler.getInstance().taskExecute(autoTask);
.collect(Collectors.toList()); if (!Strings.isEmpty(cacheId)) {
tasks.forEach(e -> { redissonUtil.deleteList(autoTask.getParamSource().getString(cacheId));
TaskType taskType = Objects.requireNonNull(TaskType.create(e.getType()));
TaskService taskService = TaskServiceHandler.getInstance()
.getTaskService(taskType.getName());
if (taskService.thresholdWarn()) {
log.error("任务类型:{},当前运行任务已到达最大核心数", taskService.getTaskType());
} }
log.info("任务类型:{},开始执行,信息:{}", taskService.getTaskType(), JSONObject.toJSONString(e)); }
taskService.runTask(e);
if (!Strings.isEmpty(taskType.getCacheId())) {
redissonUtil.deleteList(e.getParamSource().getString(taskType.getCacheId()));
}
});
} catch (Exception e) { } catch (Exception e) {
log.error("任务管理器,任务执行失败:", e); log.error("任务管理器,任务执行失败:", e);
} }
......
package com.zhiwei.middleware.automatic.son.task.holder; package com.zhiwei.middleware.automatic.son.task.holder;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import com.zhiwei.middleware.automatic.son.task.service.TaskService; import com.zhiwei.middleware.automatic.son.task.service.TaskService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import java.util.HashMap; import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
public class TaskServiceHandler { public class TaskServiceHandler {
private static final Map<String, TaskService> SERVICE_MAP = new HashMap<>(); private static final Logger log = LogManager.getLogger(TaskServiceHandler.class);
private static final List<TaskService> SERVICE_LIST = new ArrayList<>();
private TaskServiceHandler() { private TaskServiceHandler() {
ApplicationContext applicationContext = ApplicationContextHolder.getInstance(); ApplicationContext applicationContext = ApplicationContextHolder.getInstance();
Map<String, TaskService> beansOfType = applicationContext.getBeansOfType(TaskService.class); Map<String, TaskService> beansOfType = applicationContext.getBeansOfType(TaskService.class);
beansOfType.forEach((k, v) -> SERVICE_MAP.put(v.getTaskType(), v)); beansOfType.forEach((k, v) -> SERVICE_LIST.add(v));
} }
public static TaskServiceHandler getInstance() { public static TaskServiceHandler getInstance() {
return TaskServiceHandlerHolder.TASK_SERVICE_HANDLER; return TaskServiceHandlerHolder.TASK_SERVICE_HANDLER;
} }
public TaskService getTaskService(String type) { public String taskExecute(AutoTask autoTask) {
return SERVICE_MAP.get(type); for (TaskService taskService : SERVICE_LIST) {
TaskType taskType = taskService.supports(autoTask.getType());
if (Objects.nonNull(taskType)) {
if (taskService.thresholdWarn()) {
log.error("任务类型:{},当前运行任务已到达最大核心数", autoTask.getType());
}
log.info("任务类型:{},开始执行,信息:{}", autoTask.getType(), JSONObject.toJSONString(autoTask));
taskService.runTask(autoTask);
return taskType.getCacheId();
}
}
return null;
} }
private static class TaskServiceHandlerHolder { private static class TaskServiceHandlerHolder {
......
package com.zhiwei.middleware.automatic.son.task.service; package com.zhiwei.middleware.automatic.son.task.service;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask; import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
public interface TaskService { public interface TaskService {
/** /**
* 回去任务名字 * 任务类型匹配
* @return 名字 * @return 名字
*/ */
String getTaskType(); TaskType supports(String type);
/** /**
* 任务运行 * 任务运行
......
...@@ -16,6 +16,7 @@ import com.zhiwei.middleware.automatic.son.util.CosineSimilarity; ...@@ -16,6 +16,7 @@ import com.zhiwei.middleware.automatic.son.util.CosineSimilarity;
import com.zhiwei.middleware.automatic.son.util.MarkInfoUtil; import com.zhiwei.middleware.automatic.son.util.MarkInfoUtil;
import com.zhiwei.middleware.automatic.son.util.RedissonUtil; import com.zhiwei.middleware.automatic.son.util.RedissonUtil;
import com.zhiwei.middleware.automatic.son.util.Tools; import com.zhiwei.middleware.automatic.son.util.Tools;
import javafx.util.Pair;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
...@@ -31,6 +32,8 @@ public class TaskServiceCommon implements TaskService { ...@@ -31,6 +32,8 @@ public class TaskServiceCommon implements TaskService {
private static final Logger log = LogManager.getLogger(TaskServiceCommon.class); private static final Logger log = LogManager.getLogger(TaskServiceCommon.class);
private static final Map<String, Pair<TaskType, TaskCommonFunctional>> TYPE_MAP = new HashMap<>();
private final RedissonUtil redissonUtil; private final RedissonUtil redissonUtil;
private final TemplateTitleService templateTitleService; private final TemplateTitleService templateTitleService;
...@@ -46,18 +49,22 @@ public class TaskServiceCommon implements TaskService { ...@@ -46,18 +49,22 @@ public class TaskServiceCommon implements TaskService {
@Qualifier("autMarkExecutor") ThreadPoolTaskExecutor autoMarkExecutor) { @Qualifier("autMarkExecutor") ThreadPoolTaskExecutor autoMarkExecutor) {
this.redissonUtil = redissonUtil; this.redissonUtil = redissonUtil;
this.templateTitleService = templateTitleService; this.templateTitleService = templateTitleService;
this.dubboHandler = dubboHandler; this.dubboHandler = dubboHandler;
this.autoMarkExecutor = autoMarkExecutor; this.autoMarkExecutor = autoMarkExecutor;
TYPE_MAP.put(TaskType.COMMON_TWO.getType(), new Pair<>(TaskType.COMMON_TWO, this::getMultiAutoInfo));
TYPE_MAP.put(TaskType.COMMON_ONE.getType(), new Pair<>(TaskType.COMMON_ONE, this::getOneAutoInfo));
} }
@Override @Override
public String getTaskType() { public TaskType supports(String type) {
return TaskType.COMMON_ONE.getName(); Pair<TaskType, TaskCommonFunctional> pair = TYPE_MAP.get(type);
return Objects.isNull(pair) ? null : pair.getKey();
} }
@Override @Override
public void runTask(AutoTask autoTask) { public void runTask(AutoTask autoTask) {
Map<String, List<MarkInfo>> dataSourceInfo = getDataSourceInfo(autoTask); Pair<TaskType, TaskCommonFunctional> pair = Objects.requireNonNull(TYPE_MAP.get(autoTask.getType()));
Map<String, List<MarkInfo>> dataSourceInfo = pair.getValue().getDataSource(autoTask.getParamSource().getString(pair.getKey().getCacheId()));
if (dataSourceInfo.isEmpty()) { if (dataSourceInfo.isEmpty()) {
log.error("本地任务可用数据为空,taskType:{}", autoTask.getType()); log.error("本地任务可用数据为空,taskType:{}", autoTask.getType());
} }
...@@ -70,23 +77,6 @@ public class TaskServiceCommon implements TaskService { ...@@ -70,23 +77,6 @@ public class TaskServiceCommon implements TaskService {
} }
/** /**
* 获取源数据
* @param autoTask 任务
* @return 源数据按项目分组
*/
private Map<String, List<MarkInfo>> getDataSourceInfo(AutoTask autoTask) {
TaskType taskType = TaskType.create(autoTask.getType());
switch (Objects.requireNonNull(taskType)) {
case COMMON_ONE:
return getOneAutoInfo(autoTask.getParamSource().getString(taskType.getCacheId()));
case COMMON_TWO:
return getMultiAutoInfo(autoTask.getParamSource().getString(taskType.getCacheId()));
default:
return new HashMap<>();
}
}
/**
* 获取单个项目标注源数据 * 获取单个项目标注源数据
* @param key redis缓存key * @param key redis缓存key
* @return 源数据按项目分组 * @return 源数据按项目分组
...@@ -265,4 +255,10 @@ public class TaskServiceCommon implements TaskService { ...@@ -265,4 +255,10 @@ public class TaskServiceCommon implements TaskService {
return null; return null;
} }
@FunctionalInterface
public interface TaskCommonFunctional {
Map<String, List<MarkInfo>> getDataSource(String group);
}
} }
...@@ -15,6 +15,7 @@ import com.zhiwei.middleware.automatic.son.util.CosineSimilarity; ...@@ -15,6 +15,7 @@ import com.zhiwei.middleware.automatic.son.util.CosineSimilarity;
import com.zhiwei.middleware.automatic.son.util.Tools; import com.zhiwei.middleware.automatic.son.util.Tools;
import com.zhiwei.nlp.AggreeBootStarter; import com.zhiwei.nlp.AggreeBootStarter;
import com.zhiwei.nlp.vo.KResult; import com.zhiwei.nlp.vo.KResult;
import javafx.util.Pair;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder;
...@@ -27,10 +28,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; ...@@ -27,10 +28,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.io.IOException; import java.io.IOException;
import java.util.Calendar; import java.util.*;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -39,6 +37,8 @@ public class TaskServiceTemplate implements TaskService { ...@@ -39,6 +37,8 @@ public class TaskServiceTemplate implements TaskService {
private final Logger log = LogManager.getLogger(TaskServiceTemplate.class); private final Logger log = LogManager.getLogger(TaskServiceTemplate.class);
private static final Map<String, Pair<TaskType, TaskTemplateFunctional>> TYPE_MAP = new HashMap<>();
private final TemplateTitleService templateTitleService; private final TemplateTitleService templateTitleService;
private final EsDao esDao; private final EsDao esDao;
...@@ -57,16 +57,23 @@ public class TaskServiceTemplate implements TaskService { ...@@ -57,16 +57,23 @@ public class TaskServiceTemplate implements TaskService {
this.esDao = esDao; this.esDao = esDao;
this.esIndexes = esIndexes; this.esIndexes = esIndexes;
this.executor = executor; this.executor = executor;
TYPE_MAP.put(TaskType.TEMPLATE.getType(), new Pair<>(TaskType.TEMPLATE, this::runTask));
TYPE_MAP.put(TaskType.TEMPLATE_MODIFY.getType(), new Pair<>(TaskType.TEMPLATE_MODIFY, this::templateModify));
TYPE_MAP.put(TaskType.TEMPLATE_RESET.getType(), new Pair<>(TaskType.TEMPLATE_RESET, this::templateReset));
} }
@Override @Override
public String getTaskType() { public TaskType supports(String type) {
return TaskType.TEMPLATE.getName(); Pair<TaskType, TaskTemplateFunctional> par = TYPE_MAP.get(type);
return Objects.isNull(par) ? null : par.getKey();
} }
@Override @Override
public void runTask(AutoTask autoTask) { public void runTask(AutoTask autoTask) {
executor.execute(() -> switchTask(autoTask)); executor.execute(() -> {
Pair<TaskType, TaskTemplateFunctional> pair = Objects.requireNonNull(TYPE_MAP.get(autoTask.getType()));
pair.getValue().template(autoTask.getParamSource());
});
} }
@Override @Override
...@@ -75,27 +82,35 @@ public class TaskServiceTemplate implements TaskService { ...@@ -75,27 +82,35 @@ public class TaskServiceTemplate implements TaskService {
return false; return false;
} }
private void switchTask (AutoTask autoTask) { /**
JSONObject paramSource = autoTask.getParamSource(); * 模板修改任务
String group = paramSource.getString(GenericAttribute.GROUP_PARAM); * @param json 参数
switch (Objects.requireNonNull(TaskType.create(autoTask.getType()))) { */
case TEMPLATE_RESET: private void templateModify(JSONObject json) {
templateTitleService.resetTemplate(group, paramSource.getString(GenericAttribute.TEMPLATE_TITLE)); templateTitleService.modifyTemplateTitle(json.getString(GenericAttribute.GROUP_PARAM), json.getString(GenericAttribute.TEMPLATE_TITLE),
break; json.getString(GenericAttribute.FIX_TAG));
case TEMPLATE_MODIFY:
templateTitleService.modifyTemplateTitle(group, paramSource.getString(GenericAttribute.TEMPLATE_TITLE),
paramSource.getString(GenericAttribute.FIX_TAG));
break;
case TEMPLATE:
runTask(group, paramSource.getLong(GenericAttribute.START_PARAM), paramSource.getLong(GenericAttribute.END_PARAM));
break;
}
} }
private void runTask(String group, Long startTime, Long endTime) { /**
* 模板重置任务
* @param json 参数
*/
private void templateReset(JSONObject json) {
templateTitleService.resetTemplate(json.getString(GenericAttribute.GROUP_PARAM), json.getString(GenericAttribute.TEMPLATE_TITLE));
}
/**
* 模板构建任务
* @param json 参数
*/
private void runTask(JSONObject json) {
String group = json.getString(GenericAttribute.GROUP_PARAM);
try { try {
Long startTime = json.getLong(GenericAttribute.START_PARAM);
Long endTime = json.getLong(GenericAttribute.END_PARAM);
//源数据 //源数据
List<Map<String, Object>> sourceList = findRecentTimeData(group,startTime, List<Map<String, Object>> sourceList = findRecentTimeData(group, startTime,
endTime); endTime);
if (sourceList.isEmpty()) { if (sourceList.isEmpty()) {
return; return;
...@@ -230,4 +245,9 @@ public class TaskServiceTemplate implements TaskService { ...@@ -230,4 +245,9 @@ public class TaskServiceTemplate implements TaskService {
private QueryBuilder autoRobotQueryBuilder() { private QueryBuilder autoRobotQueryBuilder() {
return QueryBuilders.termQuery("mperson", GenericAttribute.AUTO_PERSON); return QueryBuilders.termQuery("mperson", GenericAttribute.AUTO_PERSON);
} }
@FunctionalInterface
public interface TaskTemplateFunctional {
void template(JSONObject json);
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment