Commit 0a607ceb by shentao

Merge branch 'feature' into 'release'

2024/11/19 cid记录任务配置、项目关闭时清理正在运行中的任务、标注模版更新时间bug修复

See merge request !130
parents 8d2f4adb 3d15433f
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version> <java.version>1.8</java.version>
<automatic.version>1.0.0.5-SNAPSHOT</automatic.version>
</properties> </properties>
<dependencies> <dependencies>
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
</parent> </parent>
<artifactId>middleware-automatic-center-client</artifactId> <artifactId>middleware-automatic-center-client</artifactId>
<version>1.0.0.5-SNAPSHOT</version> <version>1.0.0.6-SNAPSHOT</version>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
......
...@@ -19,6 +19,8 @@ public class TaskManager { ...@@ -19,6 +19,8 @@ public class TaskManager {
private static final Logger log = LogManager.getLogger(TaskManager.class); private static final Logger log = LogManager.getLogger(TaskManager.class);
private static boolean shutdown = false;
/** 定时处理线程 **/ /** 定时处理线程 **/
private ScheduledExecutorService TASK_EXECUTOR; private ScheduledExecutorService TASK_EXECUTOR;
...@@ -54,6 +56,10 @@ public class TaskManager { ...@@ -54,6 +56,10 @@ public class TaskManager {
TaskManagerHolder.builderScheduled(pullSplit, timeUnit); TaskManagerHolder.builderScheduled(pullSplit, timeUnit);
} }
public void shutdownTask() {
shutdown = true;
}
/** /**
* 添加任务 * 添加任务
* @param autoTask 任务 * @param autoTask 任务
...@@ -82,6 +88,10 @@ public class TaskManager { ...@@ -82,6 +88,10 @@ public class TaskManager {
*/ */
private void pullTask() { private void pullTask() {
try { try {
if (shutdown) {
TASK_EXECUTOR.shutdown();
return;
}
for (String s : redissonUtil.pullQueue(GenericAttribute.KEY, 10)) { for (String s : redissonUtil.pullQueue(GenericAttribute.KEY, 10)) {
if (Strings.isEmpty(s)) { if (Strings.isEmpty(s)) {
continue; continue;
......
package com.zhiwei.middleware.automatic.server.pojo;
public class CidRecord {
private String id;
private Long c4;
private int foreign;
private String url;
private Long time;
private Long ctime;
private Long stime;
private String esId;
private Long cid;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getC4() {
return c4;
}
public void setC4(Long c4) {
this.c4 = c4;
}
public int getForeign() {
return foreign;
}
public void setForeign(int foreign) {
this.foreign = foreign;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
public Long getCtime() {
return ctime;
}
public void setCtime(Long ctime) {
this.ctime = ctime;
}
public Long getStime() {
return stime;
}
public void setStime(Long stime) {
this.stime = stime;
}
public String getEsId() {
return esId;
}
public void setEsId(String esId) {
this.esId = esId;
}
public Long getCid() {
return cid;
}
public void setCid(Long cid) {
this.cid = cid;
}
}
...@@ -136,12 +136,12 @@ public class TemplateTitleVo implements Serializable { ...@@ -136,12 +136,12 @@ public class TemplateTitleVo implements Serializable {
} }
public void refreshMark() { public void refreshMark() {
this.updateTime = new Date();
this.daySum++; this.daySum++;
} }
public void accSum(long daySum) { public void accSum(long daySum) {
this.daySum += daySum; this.daySum += daySum;
this.markSum += daySum; this.markSum += daySum;
this.updateTime = new Date();
} }
} }
...@@ -11,6 +11,7 @@ public enum TaskType { ...@@ -11,6 +11,7 @@ public enum TaskType {
TEMPLATE_LOST_RESET("template_lost_reset", null, "过期重置模板清除任务"), TEMPLATE_LOST_RESET("template_lost_reset", null, "过期重置模板清除任务"),
TEMPLATE_DAY_CLEAR("template_day_clear", null, "模板每日清除任务"), TEMPLATE_DAY_CLEAR("template_day_clear", null, "模板每日清除任务"),
TEMPLATE_LOST_REMOVE("template_lost_remove", null, "过期模板清除任务"), TEMPLATE_LOST_REMOVE("template_lost_remove", null, "过期模板清除任务"),
CID_RECORD("cid_record", null, "cid记录任务"),
AI_MARK("aiMark", "aiMark", "ai标注接口"); AI_MARK("aiMark", "aiMark", "ai标注接口");
final String type; final String type;
......
...@@ -21,7 +21,6 @@ ...@@ -21,7 +21,6 @@
<qbjc-bean.version>1.1.4.1-SNAPSHOT</qbjc-bean.version> <qbjc-bean.version>1.1.4.1-SNAPSHOT</qbjc-bean.version>
<nlp-aggree.version>0.0.5-SNAPSHOT</nlp-aggree.version> <nlp-aggree.version>0.0.5-SNAPSHOT</nlp-aggree.version>
<dubbo-server.version>2.7.4.1</dubbo-server.version> <dubbo-server.version>2.7.4.1</dubbo-server.version>
<automatic.version>1.0.0.5-SNAPSHOT</automatic.version>
<base.version>2.0.0-SNAPSHOT</base.version> <base.version>2.0.0-SNAPSHOT</base.version>
</properties> </properties>
......
...@@ -4,7 +4,9 @@ import org.apache.logging.log4j.LogManager; ...@@ -4,7 +4,9 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@EnableScheduling
@SpringBootApplication @SpringBootApplication
public class Server { public class Server {
private static final Logger log = LogManager.getLogger(Server.class); private static final Logger log = LogManager.getLogger(Server.class);
......
package com.zhiwei.middleware.automatic.server.mission;
import com.zhiwei.middleware.automatic.server.core.TaskManager;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.context.annotation.Profile;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@Component
@Profile("cidRecord")
public class CidRecordMission {
private static final Logger log = LogManager.getLogger(CidRecordMission.class);
private static final String QUEUE_KEY = "cidRepeatQueue";
@Scheduled(cron = "0/1 * * * * ?")
@Async("asyncExecutor")
public void pullData() {
try {
TaskType aiMark = TaskType.CID_RECORD;
AutoTask autoTask = new AutoTask(aiMark.getType(), QUEUE_KEY);
TaskManager.getInstance().putTask(autoTask);
} catch (Exception e) {
log.error("定时拉取cid记录任务失败:", e);
}
}
}
spring.profiles.active=prod spring.profiles.active=prod,cidRecord
#spring.profiles.active=dev #spring.profiles.active=dev
#spring.profiles.active=local #spring.profiles.active=local
\ No newline at end of file
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
<qbjc-bean.version>1.1.4.1-SNAPSHOT</qbjc-bean.version> <qbjc-bean.version>1.1.4.1-SNAPSHOT</qbjc-bean.version>
<kafka.version>2.4.1.RELEASE</kafka.version> <kafka.version>2.4.1.RELEASE</kafka.version>
<base.version>2.0.0-SNAPSHOT</base.version> <base.version>2.0.0-SNAPSHOT</base.version>
<automatic.version>1.0.0.5-SNAPSHOT</automatic.version>
<marker.version>1.2.3-SNAPSHOT</marker.version> <marker.version>1.2.3-SNAPSHOT</marker.version>
<filter.version>1.1.6-SNAPSHOT</filter.version> <filter.version>1.1.6-SNAPSHOT</filter.version>
<nlp-aggree.version>0.0.5-SNAPSHOT</nlp-aggree.version> <nlp-aggree.version>0.0.5-SNAPSHOT</nlp-aggree.version>
......
...@@ -5,6 +5,7 @@ import org.springframework.context.annotation.Bean; ...@@ -5,6 +5,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -13,20 +14,7 @@ public class TaskPoolConfig { ...@@ -13,20 +14,7 @@ public class TaskPoolConfig {
@Bean("autMarkExecutor") @Bean("autMarkExecutor")
public ThreadPoolTaskExecutor autMarkExecutor() { public ThreadPoolTaskExecutor autMarkExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); return createExecutor(32, 32, "autoMark-executor-", 2000, new ThreadPoolExecutor.AbortPolicy());
// 配置核心线程数
executor.setCorePoolSize(32);
// 配置最大线程数
executor.setMaxPoolSize(32);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("autoMark-executor-");
executor.setQueueCapacity(2000);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
// 执行初始化
executor.initialize();
return executor;
} }
@Bean("templateExecutor") @Bean("templateExecutor")
...@@ -37,17 +25,26 @@ public class TaskPoolConfig { ...@@ -37,17 +25,26 @@ public class TaskPoolConfig {
@Bean("aiMarkExecutor") @Bean("aiMarkExecutor")
public ThreadPoolTaskExecutor aiMarkExecutor() { public ThreadPoolTaskExecutor aiMarkExecutor() {
return createExecutor(2, 2, "aiMark-executor-", 8, new ThreadPoolExecutor.AbortPolicy());
}
@Bean("cidRecordExecutor")
public ThreadPoolTaskExecutor cidRecordExecutor() {
return createExecutor(2, 2, "cidRecord-executor-", 8, new ThreadPoolExecutor.AbortPolicy());
}
private ThreadPoolTaskExecutor createExecutor(int core, int max, String name, int queueSize, RejectedExecutionHandler handler) {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数 // 配置核心线程数
executor.setCorePoolSize(2); executor.setCorePoolSize(core);
// 配置最大线程数 // 配置最大线程数
executor.setMaxPoolSize(2); executor.setMaxPoolSize(max);
// 配置线程池中的线程的名称前缀 // 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("aiMark-executor-"); executor.setThreadNamePrefix(name);
executor.setQueueCapacity(8); executor.setQueueCapacity(queueSize);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务 // rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行 // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); executor.setRejectedExecutionHandler(handler);
// 执行初始化 // 执行初始化
executor.initialize(); executor.initialize();
return executor; return executor;
......
package com.zhiwei.middleware.automatic.son.dao;
import com.zhiwei.middleware.automatic.server.pojo.CidRecord;
public interface CidRecordDao {
void insert(CidRecord cidRecord);
}
package com.zhiwei.middleware.automatic.son.dao.impl;
import com.zhiwei.middleware.automatic.server.pojo.CidRecord;
import com.zhiwei.middleware.automatic.son.dao.CidRecordDao;
import com.zhiwei.middleware.automatic.son.util.Tools;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.stereotype.Component;
@Component
public class CidRecordDaoImpl implements CidRecordDao {
private final MongoTemplate mongoTemplate;
private final String COLLECTION_NAME = "cleaner_cid_repeat_";
public CidRecordDaoImpl(@Qualifier("markerMongoTemplate") MongoTemplate mongoTemplate) {
this.mongoTemplate = mongoTemplate;
}
@Override
public void insert(CidRecord cidRecord) {
mongoTemplate.insert(cidRecord, COLLECTION_NAME + Tools.TIME_FORMAT_COMMON.format(cidRecord.getStime()));
}
}
...@@ -6,7 +6,7 @@ import com.zhiwei.middleware.automatic.server.util.RedissonUtil; ...@@ -6,7 +6,7 @@ import com.zhiwei.middleware.automatic.server.util.RedissonUtil;
import com.zhiwei.middleware.automatic.son.task.holder.TaskServiceHandler; import com.zhiwei.middleware.automatic.son.task.holder.TaskServiceHandler;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.redisson.api.RedissonClient; import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner; import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -16,7 +16,9 @@ import java.util.concurrent.TimeUnit; ...@@ -16,7 +16,9 @@ import java.util.concurrent.TimeUnit;
* 任务初始化 * 任务初始化
*/ */
@Component @Component
public class TaskInit implements ApplicationRunner { public class TaskInit implements ApplicationRunner, DisposableBean {
private static final Logger log = LogManager.getLogger(TaskInit.class);
private final RedissonUtil redissonUtil; private final RedissonUtil redissonUtil;
...@@ -27,6 +29,16 @@ public class TaskInit implements ApplicationRunner { ...@@ -27,6 +29,16 @@ public class TaskInit implements ApplicationRunner {
@Override @Override
public void run(ApplicationArguments args) { public void run(ApplicationArguments args) {
TaskManager.initManager(ManagerType.SON, redissonUtil, TaskServiceHandler.getInstance()::taskExecute, 1, TimeUnit.SECONDS); TaskManager.initManager(ManagerType.SON, redissonUtil, TaskServiceHandler.getInstance()::taskExecute,1, TimeUnit.SECONDS);
log.info("AutomaticSon启动成功,初始化完成");
}
@Override
public void destroy() throws Exception {
TaskManager.getInstance().shutdownTask();
log.info("AutomaticSon 系统开始关闭,休眠30秒,清理正在运行中的任务");
Thread.sleep(30000L);
TaskServiceHandler.getInstance().shutdownLog();
log.info("AutomaticSon 系统已经关闭!!!");
} }
} }
...@@ -34,7 +34,6 @@ public class TaskServiceHandler { ...@@ -34,7 +34,6 @@ public class TaskServiceHandler {
if (Objects.nonNull(taskType)) { if (Objects.nonNull(taskType)) {
try { try {
taskService.runTask(autoTask); taskService.runTask(autoTask);
taskService.executorStatus();
} catch (Exception e) { } catch (Exception e) {
log.error("任务类型:{},执行异常:", taskType.getType(), e); log.error("任务类型:{},执行异常:", taskType.getType(), e);
} }
...@@ -44,6 +43,12 @@ public class TaskServiceHandler { ...@@ -44,6 +43,12 @@ public class TaskServiceHandler {
return null; return null;
} }
public void shutdownLog() {
for (TaskService taskService : SERVICE_LIST) {
taskService.executorStatus();
}
}
private static class TaskServiceHandlerHolder { private static class TaskServiceHandlerHolder {
private static final TaskServiceHandler TASK_SERVICE_HANDLER = new TaskServiceHandler(); private static final TaskServiceHandler TASK_SERVICE_HANDLER = new TaskServiceHandler();
} }
......
package com.zhiwei.middleware.automatic.son.task.service;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import org.apache.logging.log4j.Logger;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
public abstract class AbstractTaskService implements TaskService {
protected final TaskType TYPE;
protected final ThreadPoolTaskExecutor executor;
protected final Logger log;
public AbstractTaskService(TaskType type, ThreadPoolTaskExecutor executor, Logger log) {
this.TYPE = type;
this.executor = executor;
this.log = log;
}
@Override
public TaskType supports(String type) {
return TYPE.getType().equals(type) ? TYPE : null;
}
@Override
public void executorStatus() {
log.info("任务类型:{},正在运行任务数量:{}", TYPE.getType(), executor.getActiveCount());
}
}
...@@ -23,46 +23,31 @@ import java.util.stream.Collectors; ...@@ -23,46 +23,31 @@ import java.util.stream.Collectors;
* date: 2024/6/12 15:20 * date: 2024/6/12 15:20
**/ **/
@Service("TaskServiceAiMark") @Service("TaskServiceAiMark")
public class TaskServiceAiMark implements TaskService { public class TaskServiceAiMark extends AbstractTaskService {
private static final Logger log = LogManager.getLogger(TaskServiceAiMark.class);
private final ThreadPoolTaskExecutor markAiExecutor;
private final RedissonUtil redissonUtil; private final RedissonUtil redissonUtil;
private final DubboHandler dubboHandler; private final DubboHandler dubboHandler;
private final TaskType AI_MARK = TaskType.AI_MARK;
public TaskServiceAiMark(@Qualifier("aiMarkExecutor") ThreadPoolTaskExecutor markAiExecutor, public TaskServiceAiMark(@Qualifier("aiMarkExecutor") ThreadPoolTaskExecutor markAiExecutor,
RedissonUtil redissonUtil, RedissonUtil redissonUtil,
DubboHandler dubboHandler) { DubboHandler dubboHandler) {
this.markAiExecutor = markAiExecutor; super(TaskType.AI_MARK, markAiExecutor, LogManager.getLogger(TaskServiceAiMark.class));
this.redissonUtil = redissonUtil; this.redissonUtil = redissonUtil;
this.dubboHandler = dubboHandler; this.dubboHandler = dubboHandler;
} }
@Override
public TaskType supports(String type) {
return AI_MARK.getType().equals(type) ? AI_MARK : null;
}
@Override @Override
public void runTask(AutoTask autoTask) { public void runTask(AutoTask autoTask) {
List<String> list = redissonUtil.getList(autoTask.getParamSource().getString(AI_MARK.getCacheId())); List<String> list = redissonUtil.getList(autoTask.getParamSource().getString(TYPE.getCacheId()));
if (Objects.isNull(list) || list.isEmpty()) { if (Objects.isNull(list) || list.isEmpty()) {
return; return;
} }
List<AiInterfaceParam> data = list.stream().map(e -> JSONObject.parseObject(e).toJavaObject(AiInterfaceParam.class)).collect(Collectors.toList()); List<AiInterfaceParam> data = list.stream().map(e -> JSONObject.parseObject(e).toJavaObject(AiInterfaceParam.class)).collect(Collectors.toList());
markAiExecutor.execute(() -> aiMark(data, autoTask.getParamSource().getString(GenericAttribute.ES_TIME))); executor.execute(() -> aiMark(data, autoTask.getParamSource().getString(GenericAttribute.ES_TIME)));
} }
@Override
public void executorStatus() {
}
private void aiMark(List<AiInterfaceParam> data, String timeKey) { private void aiMark(List<AiInterfaceParam> data, String timeKey) {
List<MarkInfo> list = new ArrayList<>(); List<MarkInfo> list = new ArrayList<>();
......
package com.zhiwei.middleware.automatic.son.task.service;
import com.alibaba.excel.util.CollectionUtils;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.server.pojo.CidRecord;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import com.zhiwei.middleware.automatic.server.util.RedissonUtil;
import com.zhiwei.middleware.automatic.son.dao.CidRecordDao;
import org.apache.logging.log4j.LogManager;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import java.util.List;
@Service("TaskServiceCidRecord")
public class TaskServiceCidRecord extends AbstractTaskService {
private final RedissonUtil redissonUtil;
private final CidRecordDao cidRecordDao;
private final int pullSize = 100;
public TaskServiceCidRecord(RedissonUtil redissonUtil, CidRecordDao cidRecordDao,
@Qualifier("cidRecordExecutor") ThreadPoolTaskExecutor executor) {
super(TaskType.CID_RECORD, executor, LogManager.getLogger(TaskServiceCidRecord.class));
this.redissonUtil = redissonUtil;
this.cidRecordDao = cidRecordDao;
}
@Override
public void runTask(AutoTask autoTask) {
executor.execute(() -> {
List<String> data = redissonUtil.pullQueue(autoTask.getGroup(), pullSize);
if (CollectionUtils.isEmpty(data)) {
return;
}
for (String s : data) {
try {
cidRecordDao.insert(JSONObject.parseObject(s, CidRecord.class));
} catch (Exception e) {
log.error("任务类型:{},数据新增失败,错误信息:", TYPE.getType(), e);
}
}
});
}
}
...@@ -18,6 +18,8 @@ public class Tools { ...@@ -18,6 +18,8 @@ public class Tools {
private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+"); private static final Pattern NUMBER_PATTERN = Pattern.compile("\\d+");
public static final FastDateFormat TIME_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss"); public static final FastDateFormat TIME_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
public static final FastDateFormat TIME_FORMAT_COMMON = FastDateFormat.getInstance("yyyyMMdd");
/** /**
* 是否为空,数据为空 * 是否为空,数据为空
* *
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
<slf4j.version>1.8.0-beta4</slf4j.version> <slf4j.version>1.8.0-beta4</slf4j.version>
<dubbo.version>2.7.16</dubbo.version> <dubbo.version>2.7.16</dubbo.version>
<zookeeper.version>3.4.12</zookeeper.version> <zookeeper.version>3.4.12</zookeeper.version>
<automatic.version>1.0.0.6-SNAPSHOT</automatic.version>
</properties> </properties>
<dependencyManagement> <dependencyManagement>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment