Commit 43974c10 by liuyu

Merge branch 'feature' into 'release'

2023年05/24 聚合模板bug修复,运行中任务缓存bug修复

See merge request !52
parents d9370ff8 b9251121
......@@ -93,16 +93,19 @@ public class TaskManager {
if (!Strings.isEmpty(cacheId)) {
redissonUtil.deleteList(autoTask.getParamSource().getString(cacheId));
}
// 删除正在运行得任务
if (Objects.nonNull(autoTask.isSplitFilter())) {
redissonUtil.deleteBucket(concat(GenericAttribute.RUNNING, autoTask.getGroup(), String.valueOf(autoTask.isSplitFilter())));
}
}
} catch (Exception e) {
log.error("{}端任务管理器,任务执行失败:", managerType.name(), e);
}
}
public void removeBucketCache(AutoTask autoTask) {
// 删除正在运行得任务
if (Objects.nonNull(autoTask.isSplitFilter())) {
redissonUtil.deleteBucket(concat(GenericAttribute.RUNNING, autoTask.getGroup(), String.valueOf(autoTask.isSplitFilter())));
}
}
public static String concat(Object... objects) {
StringBuilder sb = new StringBuilder();
for (Object obj : objects) {
......
......@@ -49,7 +49,7 @@ public class ScheduledMission {
putTask(project, startTime, endTime, false);
}
} catch (Exception e) {
log.error("分钟定时同步模板失败:", e);
log.error("5分钟定时同步模板失败:", e);
}
}
......
......@@ -5,6 +5,7 @@ import com.zhiwei.base.entity.subclass.mark.CompleteTextMark;
import com.zhiwei.es.index.Index;
import com.zhiwei.es.util.IndexUtil;
import com.zhiwei.middleware.automatic.server.common.GenericAttribute;
import com.zhiwei.middleware.automatic.server.core.TaskManager;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
......@@ -30,7 +31,6 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import java.io.IOException;
......@@ -141,8 +141,10 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta
if (sourceList.isEmpty()) {
return;
}
log.info("发现{}组数据{}条,聚合中...", group, sourceList.size());
log.info("发现{}组,开始:{},结束:{},数据{}条,聚合中...", group, Tools.TIME_FORMAT.format(startTime), Tools.TIME_FORMAT.format(endTime), sourceList.size());
projectDataTemplate(group, sourceList, Objects.nonNull(autoTask.isSplitFilter()) ? autoTask.isSplitFilter() : false);
log.info("发现{}组,开始:{},结束:{},数据{}条,聚合完成", group, Tools.TIME_FORMAT.format(startTime), Tools.TIME_FORMAT.format(endTime), sourceList.size());
TaskManager.getInstance().removeBucketCache(autoTask);
} catch (Exception e) {
log.error("自动聚合模板更新失败,项目:{}", group, e);
}
......@@ -177,7 +179,7 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta
boolQueryBuilder.must(timeBuilder).must(mtimeBuilder).must(mgroupBuilder).mustNot(autoRobotQueryBuilder())
.mustNot(QueryBuilders.termQuery("c2", 25165824)).mustNot(QueryBuilders.termQuery("c2", 16777216));
sourceBuilder.query(boolQueryBuilder).size(10000)
sourceBuilder.query(boolQueryBuilder)
.fetchSource(new String[] { "ind_full_text", "mtime", "mtag", "mperson", "url","id"}, null);
return esDao.afterSearch(esIndexes.getIndexes(Index.mark2.name()).toArray(new String[]{}), sourceBuilder, 1000).stream().map(SearchHit::getSourceAsMap).collect(Collectors.toList());
}
......@@ -189,9 +191,9 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta
*/
private void projectDataTemplate(String group, List<Map<String, Object>> sourceList, boolean isBig) {
//聚合模板
Map<String, TemplateTitleVo> aggregation = aggregation(transferMark(sourceList));
Map<String, TemplateTitleVo> newTemplate = aggregation(transferMark(sourceList));
//旧的聚合模板
Map<String, TemplateTitleVo> templateTitleByProject = templateTitleService.getTemplateTitleByProjectLive(group).entrySet().stream()
Map<String, TemplateTitleVo> oldTemplate = templateTitleService.getTemplateTitleByProjectLive(group).entrySet().stream()
.filter(e -> {
String title = e.getKey();
TemplateTitleVo templateTitleVo = e.getValue();
......@@ -199,7 +201,7 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta
templateTitleVo.buildId(group);
}
long updateTime = templateTitleVo.getUpdateTime().getTime();
// 移除7天有效期外的数据
// 移除1天有效期外的数据
if (System.currentTimeMillis() - updateTime > ONE_DAY * 1000) {
log.info("{}-移除过期模板标题:{},最后更新时间:{}", group, title, updateTime);
templateTitleService.deleteTemplate(group, title);
......@@ -212,7 +214,7 @@ public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.Ta
return true;
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// 新旧模板合并 且更新模板
mergeTemplate(group, aggregation, templateTitleByProject, isBig);
mergeTemplate(group, oldTemplate, newTemplate, isBig);
}
/**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment