Commit bb597e3c by liuyu

Merge branch 'release' into 'master'

Release

See merge request !12
parents e0e61b55 70540db7
......@@ -19,9 +19,8 @@ public class AutoMaticClientConfiguration {
@Bean
@ConditionalOnMissingBean(AutoMaticClient.class)
public AutoMaticClient authClient(AutoMaticClientConfigurationProperties properties) {
return new AutoMaticClient(AutoMaticClientFactory.createInstance(AutoMaticService.class, properties.getApplication(),properties.getRegistry(),
properties.getConsumer()));
return AutoMaticClientFactory.getAutoMaticClient(properties.getApplication(), properties.getRegistry(),
properties.getConsumer(), properties.getConfigCenterConfig());
}
}
package com.zhiwei.middleware.automatic.configuration;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ConfigCenterConfig;
import org.apache.dubbo.config.ConsumerConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
......@@ -11,6 +12,7 @@ public class AutoMaticClientConfigurationProperties {
private ApplicationConfig application;
private RegistryConfig registry;
private ConsumerConfig consumer;
private ConfigCenterConfig configCenterConfig;
public ApplicationConfig getApplication() {
return application;
......@@ -35,4 +37,12 @@ public class AutoMaticClientConfigurationProperties {
public void setConsumer(ConsumerConfig consumer) {
this.consumer = consumer;
}
public ConfigCenterConfig getConfigCenterConfig() {
return configCenterConfig;
}
public void setConfigCenterConfig(ConfigCenterConfig configCenterConfig) {
this.configCenterConfig = configCenterConfig;
}
}
......@@ -71,27 +71,6 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>${dubbo.version}</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
......
......@@ -2,15 +2,13 @@ package com.zhiwei.middleware.automatic.server.core;
import com.zhiwei.middleware.automatic.server.dubbo.service.AutoMaticService;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ConsumerConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.config.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import static java.util.Objects.*;
......@@ -25,7 +23,8 @@ public class AutoMaticClientFactory {
}
public static <T> T createInstance(Class<T> clazz, ApplicationConfig application, RegistryConfig registry, ConsumerConfig consumer) {
private static <T> T createInstance(Class<T> clazz, ApplicationConfig application, RegistryConfig registry,
ConsumerConfig consumer, ConfigCenterConfig configCenterConfig) {
lock.lock();
try {
if (isNull(application)) {
......@@ -48,8 +47,6 @@ public class AutoMaticClientFactory {
}
reference = new ReferenceConfig<>();
reference.setApplication(application);
//向注册中心注册
registry.setTimeout(600000);
reference.setRegistry(registry);
if (isNull(consumer.isCheck())) {
// 如果消费者没有配置检查,则默认不检查
......@@ -57,10 +54,11 @@ public class AutoMaticClientFactory {
// reference 没有配置时会使用 consumer 配置
reference.setCheck(false);
}
// 设置消费者配置
consumer.setTimeout(600000);
reference.setConsumer(consumer);
reference.setInterface(clazz);
if (Objects.nonNull(configCenterConfig)) {
reference.setConfigCenter(configCenterConfig);
}
REFERENCES.put(cacheKey, reference);
//获取目标接口
return (T) reference.get();
......@@ -72,23 +70,24 @@ public class AutoMaticClientFactory {
return null;
}
public static <T> T createInstance(Class<T> clazz, String registry, String group, String appName) {
public static AutoMaticClient getAutoMaticClient(String registry, String group, String appName, long timeOut) {
ApplicationConfig application = new ApplicationConfig();
RegistryConfig reg = new RegistryConfig(registry);
application.setName(appName);
ConsumerConfig consumer = new ConsumerConfig();
consumer.setGroup(group);
return createInstance(clazz, application, reg, consumer);
ConfigCenterConfig configCenterConfig = new ConfigCenterConfig();
configCenterConfig.setTimeout(timeOut);
return new AutoMaticClient(createInstance(AutoMaticService.class, application, reg, consumer, configCenterConfig));
}
/**
* 获取自动标注client
* @param autoMaticService 代理接口
* @return 自动标注client
*/
public static AutoMaticClient getAutoMaticClient(AutoMaticService autoMaticService) {
return new AutoMaticClient(autoMaticService);
public static AutoMaticClient getAutoMaticClient(ApplicationConfig application, RegistryConfig registry,
ConsumerConfig consumer, ConfigCenterConfig configCenterConfig) {
return new AutoMaticClient(createInstance(AutoMaticService.class, application, registry, consumer, configCenterConfig));
}
}
......@@ -24,7 +24,7 @@ public class TemplateTitleVo implements Serializable {
return id;
}
public void setId(String group) {
public void buildId(String group) {
MessageDigest sMd5Digest = null;
try {
sMd5Digest = MessageDigest.getInstance("MD5");
......@@ -37,6 +37,10 @@ public class TemplateTitleVo implements Serializable {
this.id = numValue.toString(16);
}
public void setId(String id) {
this.id = id;
}
public String getTemplateTitle() {
return templateTitle;
}
......@@ -104,6 +108,8 @@ public class TemplateTitleVo implements Serializable {
this.status = TemplateStatus.运行中;
}
public TemplateTitleVo() {}
public void refreshMark() {
this.updateTime = new Date();
}
......
package com.zhiwei.middleware.automatic.server.pojo.enums;
public enum TaskType {
COMMON_ONE("common_one","common", "commonCache"),
COMMON_TWO("common_two","common", "commonCache"),
TEMPLATE("template", "template", null),
TEMPLATE_MODIFY("template_modify","template", null),
TEMPLATE_RESET("template_reset","template", null);
COMMON_ONE("common_one", "commonCache"),
COMMON_TWO("common_two", "commonCache"),
TEMPLATE("template", null),
TEMPLATE_MODIFY("template_modify", null),
TEMPLATE_RESET("template_reset", null),
TEMPLATE_RECORD("template_record", null),
TEMPLATE_CLEAR_RETRY("template_clear_retry", null);
final String type;
final String name;
final String cacheId;
TaskType(String type, String name, String cacheId) {
TaskType(String type, String cacheId) {
this.type = type;
this.name = name;
this.cacheId = cacheId;
}
public String getName() {
return this.name;
}
public String getType() {
return this.type;
}
......
......@@ -155,6 +155,31 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
<version>${dubbo-server.version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
......
......@@ -3,8 +3,11 @@ package com.zhiwei.middleware.automatic.server.mission;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.middleware.automatic.server.common.GenericAttribute;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus;
import com.zhiwei.middleware.automatic.server.redis.RedissonUtil;
import com.zhiwei.middleware.automatic.server.service.TemplateTitleService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.scheduling.annotation.Async;
......@@ -13,6 +16,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Calendar;
import java.util.Map;
@Component
@EnableScheduling
......@@ -20,13 +24,19 @@ public class ScheduledMission {
private final Logger log = LogManager.getLogger(ScheduledMission.class);
private static final Long MONTH = 1000L * 60 * 60 * 24 * 30;
private final RedissonUtil redissonUtil;
private final AsyncTask asyncTask;
public ScheduledMission(RedissonUtil redissonUtil, AsyncTask asyncTask) {
private final TemplateTitleService templateTitleService;
public ScheduledMission(RedissonUtil redissonUtil, AsyncTask asyncTask,
TemplateTitleService templateTitleService) {
this.redissonUtil = redissonUtil;
this.asyncTask = asyncTask;
this.templateTitleService = templateTitleService;
}
......@@ -58,8 +68,12 @@ public class ScheduledMission {
long startTime = calendar.getTime().getTime();
long endTime = System.currentTimeMillis();
for (String project : asyncTask.findAllGroup()) {
//模板聚合任务
putTask(project, startTime, endTime);
}
// 模板记录清除任务
putTaskByRecord();
} catch (Exception e) {
log.error("每天定时同步模板失败:", e);
}
......@@ -73,4 +87,22 @@ public class ScheduledMission {
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask));
}
private void putTaskByRecord() {
AutoTask autoTask = new AutoTask(TaskType.TEMPLATE_RECORD.getType());
autoTask.getParamSource().put(GenericAttribute.END_PARAM, System.currentTimeMillis() - MONTH);
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask));
}
private void putTaskBy(String group) {
Map<String, TemplateTitleVo> project = templateTitleService.getTemplateTitleByProject(group);
for (Map.Entry<String, TemplateTitleVo> entry : project.entrySet()) {
if (entry.getValue().getStatus() == TemplateStatus.重置失败) {
AutoTask autoTask = new AutoTask(TaskType.TEMPLATE_CLEAR_RETRY.getType());
autoTask.getParamSource().put(GenericAttribute.GROUP_PARAM, group);
autoTask.getParamSource().put(GenericAttribute.TEMPLATE_TITLE, entry.getKey());
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask));
}
}
}
}
......@@ -5,13 +5,13 @@ spring.redis.redisson.file=classpath:redisson-local.yaml
dubbo.application.name=automatic-provider
dubbo.application.qos.enable=false
dubbo.registry.address=zookeeper://192.168.0.11:2181?backup=192.168.0.30:2181,192.168.0.35:2181
#dubbo.config-center.timeout=60000
dubbo.config-center.timeout=60000
dubbo.protocol.name=dubbo
dubbo.protocol.port=7779
dubbo.provider.timeout=60000
dubbo.registry.timeout=60000
dubbo.registry.version=*
dubbo.provider.group=zhiwei-automatic
dubbo.provider.group=zhiwei-middleware-automatic-local
dubbo.scan.basePackages=com.zhiwei.middleware.automatic.server.dubbo.service.impl
dubbo.monitor.protocol=registry
dubbo.application.shutwait=30s
......
......@@ -11,7 +11,7 @@ dubbo.protocol.port=7779
dubbo.provider.timeout=60000
dubbo.registry.timeout=60000
dubbo.registry.version=*
dubbo.provider.group=zhiwei-automatic
dubbo.provider.group=zhiwei-middleware-automatic
dubbo.scan.basePackages=com.zhiwei.middleware.automatic.server.dubbo.service.impl
dubbo.monitor.protocol=registry
dubbo.application.shutwait=30s
......
......@@ -33,4 +33,6 @@ public interface TemplateRecordDao {
* @param query 条件
*/
void removeTemplateRecord (Query query);
void tempRecord(TemplateTempRecord templateTempRecord);
}
package com.zhiwei.middleware.automatic.son.dao;
import org.springframework.data.mongodb.core.mapping.Document;
@Document("automaticmark_template_record_temp")
public class TemplateTempRecord {
private String id;
private String templateId;
private String templateTitle;
private String group;
private String url;
private String mtag;
public TemplateTempRecord() {}
public TemplateTempRecord(String id, String templateId, String templateTitle,
String group, String url, String mtag) {
this.id = id;
this.templateId = templateId;
this.templateTitle = templateTitle;
this.group = group;
this.url = url;
this.mtag = mtag;
}
public TemplateTempRecord(String templateId, String templateTitle,
String group, String url, String mtag) {
this.templateId = templateId;
this.templateTitle = templateTitle;
this.group = group;
this.url = url;
this.mtag = mtag;
}
public String getId() {
return id;
}
public String getTemplateId() {
return templateId;
}
public String getTemplateTitle() {
return templateTitle;
}
public String getGroup() {
return group;
}
public String getUrl() {
return url;
}
public String getMtag() {
return mtag;
}
}
......@@ -2,6 +2,7 @@ package com.zhiwei.middleware.automatic.son.dao.impl;
import com.zhiwei.middleware.automatic.server.pojo.TemplateRecord;
import com.zhiwei.middleware.automatic.son.dao.TemplateRecordDao;
import com.zhiwei.middleware.automatic.son.dao.TemplateTempRecord;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Query;
......@@ -37,4 +38,9 @@ public class TemplateRecordDaoImpl implements TemplateRecordDao {
public void removeTemplateRecord(Query query) {
mongoTemplate.remove(query, TemplateRecord.class, "automaticmark_template_record_new");
}
@Override
public void tempRecord(TemplateTempRecord templateTempRecord) {
mongoTemplate.insert(templateTempRecord);
}
}
......@@ -2,6 +2,7 @@ package com.zhiwei.middleware.automatic.son.service;
import com.zhiwei.middleware.automatic.server.pojo.TemplateRecord;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import com.zhiwei.middleware.automatic.son.dao.TemplateTempRecord;
import java.util.List;
import java.util.Map;
......@@ -45,5 +46,4 @@ public interface TemplateTitleService {
* @return 是否成功
*/
boolean resetTemplate (String group, String templateTitle);
}
......@@ -15,6 +15,7 @@ import com.zhiwei.middleware.automatic.son.service.TemplateTitleService;
import com.zhiwei.middleware.automatic.son.util.MarkInfoUtil;
import com.zhiwei.middleware.automatic.son.util.RedissonUtil;
import com.zhiwei.middleware.automatic.son.util.Tools;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.query.BoolQueryBuilder;
......@@ -46,6 +47,9 @@ public class TemplateTitleServiceImpl implements TemplateTitleService {
private final ThreadPoolTaskExecutor executor;
private static final FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd");
private static final String AGGREGATION_FILE = "aggregation-";
public TemplateTitleServiceImpl(RedissonUtil redissonUtil, EsDao esDao,
IndexUtil.ESIndexes esIndexes, TemplateRecordDao templateRecordDao,
DubboHandler dubboHandler,
......@@ -61,7 +65,8 @@ public class TemplateTitleServiceImpl implements TemplateTitleService {
@Override
public Map<String, TemplateTitleVo> getTemplateTitleByProject(String project) {
Map<String, String> mapValue = redissonUtil.getMapValue(Tools.assembleKey(GenericAttribute.REDIS_MAP_KEY, project));
// Map<String, String> mapValue = redissonUtil.getMapValue(Tools.assembleKey(GenericAttribute.REDIS_MAP_KEY, project));
Map<String, String> mapValue = redissonUtil.getMapValue(Tools.assembleKey(getFileDayName(project)));
if (Tools.isEmpty(mapValue)) {
return new HashMap<>();
}
......@@ -185,4 +190,8 @@ public class TemplateTitleServiceImpl implements TemplateTitleService {
query.must(should);
return esDao.search(indexes, null, query, null, 0, 1000, null);
}
private static String getFileDayName(String group) {
return AGGREGATION_FILE + format.format(new Date()) + ":" + group;
}
}
package com.zhiwei.middleware.automatic.son.task;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import org.apache.commons.lang3.tuple.Pair;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public class BaseTaskTypePair<T> {
private final Map<String, Pair<TaskType, T>> TYPE_MAP = new HashMap<>();
public void taskCache(TaskType taskType, T t) {
TYPE_MAP.put(taskType.getType(), Pair.of(taskType, t));
}
public TaskType getPairKey(String type) {
Pair<TaskType, T> taskTypeTPair = TYPE_MAP.get(type);
return Objects.isNull(taskTypeTPair) ? null : taskTypeTPair.getKey();
}
public Pair<TaskType, T> getPair(String type) {
return TYPE_MAP.get(type);
}
public T getPairValue(String type) {
return TYPE_MAP.get(type).getValue();
}
}
......@@ -10,8 +10,11 @@ import com.zhiwei.middleware.automatic.server.pojo.TemplateRecord;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus;
import com.zhiwei.middleware.automatic.son.dao.TemplateRecordDao;
import com.zhiwei.middleware.automatic.son.dao.TemplateTempRecord;
import com.zhiwei.middleware.automatic.son.dubbo.DubboHandler;
import com.zhiwei.middleware.automatic.son.service.TemplateTitleService;
import com.zhiwei.middleware.automatic.son.task.BaseTaskTypePair;
import com.zhiwei.middleware.automatic.son.util.CosineSimilarity;
import com.zhiwei.middleware.automatic.son.util.MarkInfoUtil;
import com.zhiwei.middleware.automatic.son.util.RedissonUtil;
......@@ -27,42 +30,43 @@ import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@Service("TaskServiceCommon")
public class TaskServiceCommon implements TaskService {
public class TaskServiceCommon extends BaseTaskTypePair<TaskServiceCommon.TaskCommonFunctional> implements TaskService {
private static final Logger log = LogManager.getLogger(TaskServiceCommon.class);
private static final Map<String, Pair<TaskType, TaskCommonFunctional>> TYPE_MAP = new HashMap<>();
private final RedissonUtil redissonUtil;
private final TemplateTitleService templateTitleService;
private final DubboHandler dubboHandler;
private final TemplateRecordDao templateRecordDao;
private final ThreadPoolTaskExecutor autoMarkExecutor;
private final static String COUNT_KEY = "count";
public TaskServiceCommon(RedissonUtil redissonUtil, TemplateTitleService templateTitleService,
DubboHandler dubboHandler,
TemplateRecordDao templateRecordDao,
@Qualifier("autMarkExecutor") ThreadPoolTaskExecutor autoMarkExecutor) {
this.redissonUtil = redissonUtil;
this.templateTitleService = templateTitleService;
this.dubboHandler = dubboHandler;
this.templateRecordDao = templateRecordDao;
this.autoMarkExecutor = autoMarkExecutor;
TYPE_MAP.put(TaskType.COMMON_TWO.getType(), Pair.of(TaskType.COMMON_TWO, this::getMultiAutoInfo));
TYPE_MAP.put(TaskType.COMMON_ONE.getType(), Pair.of(TaskType.COMMON_ONE, this::getOneAutoInfo));
taskCache(TaskType.COMMON_TWO, this::getMultiAutoInfo);
taskCache(TaskType.COMMON_ONE, this::getOneAutoInfo);
}
@Override
public TaskType supports(String type) {
Pair<TaskType, TaskCommonFunctional> pair = TYPE_MAP.get(type);
return Objects.isNull(pair) ? null : pair.getKey();
return getPairKey(type);
}
@Override
public void runTask(AutoTask autoTask) {
Pair<TaskType, TaskCommonFunctional> pair = Objects.requireNonNull(TYPE_MAP.get(autoTask.getType()));
Pair<TaskType, TaskCommonFunctional> pair = getPair(autoTask.getType());
Map<String, List<MarkInfo>> dataSourceInfo = pair.getValue().getDataSource(autoTask.getParamSource().getString(pair.getKey().getCacheId()));
if (dataSourceInfo.isEmpty()) {
log.error("本地任务可用数据为空,taskType:{}", autoTask.getType());
......@@ -182,33 +186,54 @@ public class TaskServiceCommon implements TaskService {
List<MarkInfo> newList = markInfos.stream().filter(markInfo -> {
String title = markInfo.getSourceObj().getString(GenericAttribute.ES_TITLE);
return null != title && title.length() > 6;
}).collect(Collectors.toList());
for (MarkInfo markInfo : newList) {
JSONObject sourceObj = markInfo.getSourceObj();
String title = Tools.filterSymbol(sourceObj.getString(GenericAttribute.ES_TITLE));
Map<String, Object> similarMap = similarMapInfo(titleVoMap, title, group);
if (!similarMap.isEmpty()) {
// 填充数据
String aggreTitle = String.valueOf(similarMap.get("aggreTitle"));
TemplateTitleVo templateTitleVo = titleVoMap.get(aggreTitle);
String aggreTag = templateTitleVo.getMtag();
sourceObj.put(GenericAttribute.ES_M_TAG, aggreTag);
sourceObj.put(GenericAttribute.ES_M_PERSON, "自动化机器人");
sourceObj.put(GenericAttribute.ES_M_TIME, new Date().getTime());
log.info("项目:{} 模板标题:{} MarkSum:{} Tag:{}被标注标题:{}相似度:{}", group, aggreTitle, templateTitleVo.getMarkSum(), aggreTag,
title, similarMap.get("similar"));
}).filter(e -> autoMark(group, e, titleVoMap)).collect(Collectors.toList());
// dubboHandler.markUpsert(newList);
}
private boolean autoMark(String group, MarkInfo markInfo, Map<String, TemplateTitleVo> titleVoMap) {
JSONObject sourceObj = markInfo.getSourceObj();
String title = Tools.filterSymbol(sourceObj.getString(GenericAttribute.ES_TITLE));
Map<String, Object> similarMap = similarMapInfo(titleVoMap, title, group);
if (!similarMap.isEmpty()) {
// 填充数据
String aggreTitle = String.valueOf(similarMap.get("aggreTitle"));
TemplateTitleVo templateTitleVo = titleVoMap.get(aggreTitle);
String aggreTag = templateTitleVo.getMtag();
sourceObj.put(GenericAttribute.ES_M_TAG, aggreTag);
sourceObj.put(GenericAttribute.ES_M_PERSON, "自动化机器人");
sourceObj.put(GenericAttribute.ES_M_TIME, new Date().getTime());
log.info("项目:{} 模板标题:{} MarkSum:{} Tag:{}被标注标题:{}相似度:{}", group, aggreTitle, templateTitleVo.getMarkSum(), aggreTag,
title, similarMap.get("similar"));
// 刷新一下标注量和标注时间,
try {
String[] updates = dubboHandler.getMupdates(markInfo.filterInfo());
templateTitleVo.refreshMark();
templateTitleService.insertTemplateRecord(new TemplateRecord(templateTitleVo.getId(), updates[0]));
redissonUtil.putCount(Tools.assembleKey(COUNT_KEY, group, templateTitleVo.getId()), 1);
} catch (Exception e) {
log.error("记录事件采集-标注数据特征值失败", e);
}
// 刷新一下标注量和标注时间,
try {
String[] updates = dubboHandler.getMupdates(markInfo.filterInfo());
templateTitleVo.refreshMark();
// 特征值记录
templateTitleService.insertTemplateRecord(new TemplateRecord(templateTitleVo.getId(), updates[0]));
// 模板标注count累加
redissonUtil.putCount(Tools.assembleKey(COUNT_KEY, group, templateTitleVo.getId()), 1);
// 测试环境临时添加,用于对比
templateRecordDao.tempRecord(new TemplateTempRecord(templateTitleVo.getId(), templateTitleVo.getTemplateTitle(), group, getUrl(markInfo), templateTitleVo.getMtag()));
return true;
} catch (Exception e) {
log.error("记录事件采集-标注数据特征值失败", e);
}
}
return false;
}
private String getUrl(MarkInfo markInfo) {
switch (markInfo.getTypeB()) {
case COMPLETE:
case INCOMPLETE:
case VIDEO:
return markInfo.getSourceObj().getString("url");
case QA:
String url = markInfo.getSourceObj().getString("answer_url");
return Objects.nonNull(url) ? url : markInfo.getSourceObj().getString("question_url");
}
return null;
}
private Map<String, Object> similarMapInfo(Map<String, TemplateTitleVo> titleVoMap, String title, String group) {
......@@ -216,7 +241,7 @@ public class TaskServiceCommon implements TaskService {
Map<String, Object> similarMap = new HashMap<>();
for (TemplateTitleVo templateTitleVo : titleVoMap.values()) {
if (Objects.isNull(templateTitleVo.getId())) {
templateTitleVo.setId(group);
templateTitleVo.buildId(group);
}
// 过滤掉以重置的模板
if (templateTitleVo.getStatus() == TemplateStatus.已重置 || Tools.isEmpty(templateTitleVo.getMtag())) {
......@@ -239,17 +264,17 @@ public class TaskServiceCommon implements TaskService {
int c2 = Integer.parseInt(String.valueOf(hit.get("c2")));
switch (ClassB.TypeB.fromEncode(c2)){
case COMPLETE:
CompleteTextMark context =CompleteTextMark.restoreFromEs(hit);
CompleteTextMark context = CompleteTextMark.restoreFromEs(hit);
return new MarkInfo(context);
case INCOMPLETE:
IncompleteTextMark incompleteTextMark = IncompleteTextMark.restoreFromEs(hit);
return new MarkInfo(incompleteTextMark);
case QA:
QATextMark qaTextMark = QATextMark.restoreFromEs(hit);
new MarkInfo(qaTextMark);
return new MarkInfo(qaTextMark);
case VIDEO:
VideoMark videoMark = VideoMark.restoreFromEs(hit);
new MarkInfo(videoMark);
return new MarkInfo(videoMark);
}
return null;
}
......
......@@ -10,7 +10,9 @@ import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus;
import com.zhiwei.middleware.automatic.son.dao.EsDao;
import com.zhiwei.middleware.automatic.son.dao.TemplateRecordDao;
import com.zhiwei.middleware.automatic.son.service.TemplateTitleService;
import com.zhiwei.middleware.automatic.son.task.BaseTaskTypePair;
import com.zhiwei.middleware.automatic.son.util.CosineSimilarity;
import com.zhiwei.middleware.automatic.son.util.Tools;
import com.zhiwei.nlp.AggreeBootStarter;
......@@ -24,6 +26,8 @@ import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
......@@ -32,19 +36,19 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Service
public class TaskServiceTemplate implements TaskService {
//@Service
public class TaskServiceTemplate extends BaseTaskTypePair<TaskServiceTemplate.TaskTemplateFunctional> implements TaskService {
private final Logger log = LogManager.getLogger(TaskServiceTemplate.class);
private static final Map<String, Pair<TaskType, TaskTemplateFunctional>> TYPE_MAP = new HashMap<>();
private final TemplateTitleService templateTitleService;
private final EsDao esDao;
private final IndexUtil.ESIndexes esIndexes;
private final TemplateRecordDao templateRecordDao;
private final ThreadPoolTaskExecutor executor;
/* 一天的秒数(为保留前一天文件) */
......@@ -52,27 +56,30 @@ public class TaskServiceTemplate implements TaskService {
public TaskServiceTemplate(TemplateTitleService templateTitleService,
EsDao esDao, IndexUtil.ESIndexes esIndexes,
TemplateRecordDao templateRecordDao,
@Qualifier("templateExecutor") ThreadPoolTaskExecutor executor) {
this.templateTitleService = templateTitleService;
this.esDao = esDao;
this.esIndexes = esIndexes;
this.templateRecordDao = templateRecordDao;
this.executor = executor;
TYPE_MAP.put(TaskType.TEMPLATE.getType(), Pair.of(TaskType.TEMPLATE, this::runTask));
TYPE_MAP.put(TaskType.TEMPLATE_MODIFY.getType(), Pair.of(TaskType.TEMPLATE_MODIFY, this::templateModify));
TYPE_MAP.put(TaskType.TEMPLATE_RESET.getType(), Pair.of(TaskType.TEMPLATE_RESET, this::templateReset));
taskCache(TaskType.TEMPLATE, this::runTask);
taskCache(TaskType.TEMPLATE_MODIFY, this::templateModify);
taskCache(TaskType.TEMPLATE_RESET, this::templateReset);
taskCache(TaskType.TEMPLATE_CLEAR_RETRY, this::templateReset);
taskCache(TaskType.TEMPLATE_RECORD, this::templateRecordClean);
}
@Override
public TaskType supports(String type) {
Pair<TaskType, TaskTemplateFunctional> par = TYPE_MAP.get(type);
return Objects.isNull(par) ? null : par.getKey();
return getPairKey(type);
}
@Override
public void runTask(AutoTask autoTask) {
executor.execute(() -> {
Pair<TaskType, TaskTemplateFunctional> pair = Objects.requireNonNull(TYPE_MAP.get(autoTask.getType()));
pair.getValue().template(autoTask.getParamSource());
TaskTemplateFunctional pair = getPairValue(autoTask.getType());
pair.template(autoTask.getParamSource());
});
}
......@@ -82,6 +89,10 @@ public class TaskServiceTemplate implements TaskService {
return false;
}
private void templateRecordClean(JSONObject json) {
templateRecordDao.removeTemplateRecord(new Query(Criteria.where("createAt").lt(json.getLong(GenericAttribute.END_PARAM))));
}
/**
* 模板修改任务
* @param json 参数
......@@ -163,7 +174,7 @@ public class TaskServiceTemplate implements TaskService {
String title = e.getKey();
TemplateTitleVo templateTitleVo = e.getValue();
if (Objects.isNull(templateTitleVo.getId())) {
templateTitleVo.setId(group);
templateTitleVo.buildId(group);
}
long updateTime = templateTitleVo.getUpdateTime().getTime();
// 移除7天有效期外的数据
......
package com.zhiwei.middleware.automatic.son.task.service;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.entity.subclass.mark.CompleteTextMark;
import com.zhiwei.es.index.Index;
import com.zhiwei.es.util.IndexUtil;
import com.zhiwei.middleware.automatic.server.common.GenericAttribute;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus;
import com.zhiwei.middleware.automatic.son.dao.EsDao;
import com.zhiwei.middleware.automatic.son.dao.TemplateRecordDao;
import com.zhiwei.middleware.automatic.son.service.TemplateTitleService;
import com.zhiwei.middleware.automatic.son.task.BaseTaskTypePair;
import com.zhiwei.middleware.automatic.son.util.CosineSimilarity;
import com.zhiwei.middleware.automatic.son.util.Tools;
import com.zhiwei.nlp.AggreeBootStarter;
import com.zhiwei.nlp.vo.KResult;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Service
public class TaskServiceTemplateNEW extends BaseTaskTypePair<TaskServiceTemplateNEW.TaskTemplateFunctional> implements TaskService {
private final Logger log = LogManager.getLogger(TaskServiceTemplateNEW.class);
private final TemplateTitleService templateTitleService;
private final EsDao esDao;
private final IndexUtil.ESIndexes esIndexes;
private final TemplateRecordDao templateRecordDao;
private final ThreadPoolTaskExecutor executor;
/* 一天的秒数(为保留前一天文件) */
private static final int ONE_DAY = 60 * 60 * 24;
public TaskServiceTemplateNEW(TemplateTitleService templateTitleService,
EsDao esDao, IndexUtil.ESIndexes esIndexes,
TemplateRecordDao templateRecordDao,
@Qualifier("templateExecutor") ThreadPoolTaskExecutor executor) {
this.templateTitleService = templateTitleService;
this.esDao = esDao;
this.esIndexes = esIndexes;
this.templateRecordDao = templateRecordDao;
this.executor = executor;
taskCache(TaskType.TEMPLATE, this::runTask);
taskCache(TaskType.TEMPLATE_MODIFY, this::templateModify);
taskCache(TaskType.TEMPLATE_RESET, this::templateReset);
taskCache(TaskType.TEMPLATE_CLEAR_RETRY, this::templateReset);
taskCache(TaskType.TEMPLATE_RECORD, this::templateRecordClean);
}
@Override
public TaskType supports(String type) {
// return getPairKey(type);
return null;
}
@Override
public void runTask(AutoTask autoTask) {
executor.execute(() -> {
TaskTemplateFunctional pair = getPairValue(autoTask.getType());
pair.template(autoTask.getParamSource());
});
}
@Override
public boolean thresholdWarn() {
// return executor.getActiveCount() == executor.getCorePoolSize();
return false;
}
private void templateRecordClean(JSONObject json) {
templateRecordDao.removeTemplateRecord(new Query(Criteria.where("createAt").lt(json.getLong(GenericAttribute.END_PARAM))));
}
/**
* 模板修改任务
* @param json 参数
*/
private void templateModify(JSONObject json) {
templateTitleService.modifyTemplateTitle(json.getString(GenericAttribute.GROUP_PARAM), json.getString(GenericAttribute.TEMPLATE_TITLE),
json.getString(GenericAttribute.FIX_TAG));
}
/**
* 模板重置任务
* @param json 参数
*/
private void templateReset(JSONObject json) {
templateTitleService.resetTemplate(json.getString(GenericAttribute.GROUP_PARAM), json.getString(GenericAttribute.TEMPLATE_TITLE));
}
/**
* 模板构建任务
* @param json 参数
*/
private void runTask(JSONObject json) {
String group = json.getString(GenericAttribute.GROUP_PARAM);
try {
Long startTime = json.getLong(GenericAttribute.START_PARAM);
Long endTime = json.getLong(GenericAttribute.END_PARAM);
//源数据
List<Map<String, Object>> sourceList = findRecentTimeData(group, startTime,
endTime);
if (sourceList.isEmpty()) {
return;
}
log.info("发现{}组数据{}条,聚合中...", group, sourceList.size());
projectDataTemplate(group, sourceList);
} catch (Exception e) {
log.error("自动聚合模板更新失败,项目:{}", group, e);
}
}
/**
* 查询该项目 指定时间范围的数据
* @param mgroup 项目
* @return 数据集
* @throws IOException io
*/
private List<Map<String, Object>> findRecentTimeData(String mgroup, Long startTime, Long endTime) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// 标注时间小时级内
QueryBuilder mtimeBuilder = QueryBuilders.rangeQuery("mtime").from(startTime).to(endTime);
Calendar calendar2 = Calendar.getInstance();
calendar2.add(Calendar.DAY_OF_MONTH, -1);
// 文章时间一天内
QueryBuilder timeBuilder = QueryBuilders.rangeQuery("time").from(calendar2.getTime().getTime()).to(endTime);
QueryBuilder mgroupBuilder = QueryBuilders.matchPhraseQuery("mgroup", mgroup);
// 过滤自动化机器人标注数据
boolQueryBuilder.must(timeBuilder).must(mtimeBuilder).must(mgroupBuilder).mustNot(autoRobotQueryBuilder())
.mustNot(QueryBuilders.termQuery("c2", 25165824)).mustNot(QueryBuilders.termQuery("c2", 16777216));
sourceBuilder.query(boolQueryBuilder).size(10000)
.fetchSource(new String[] { "ind_full_text", "mtime", "mtag", "mperson", "url","id"}, null);
return esDao.afterSearch(esIndexes.getIndexes(Index.mark.name()).toArray(new String[]{}), sourceBuilder, 1000).stream().map(SearchHit::getSourceAsMap).collect(Collectors.toList());
}
/**
* 改项目的数据生成模板
* @param group 项目
* @param sourceList 数据集
*/
private void projectDataTemplate(String group, List<Map<String, Object>> sourceList) {
//聚合模板
Map<String, TemplateTitleVo> aggregation = aggregation(transferMark(sourceList));
//旧的聚合模板
Map<String, TemplateTitleVo> templateTitleByProject = templateTitleService.getTemplateTitleByProject(group).entrySet().stream()
.filter(e -> {
String title = e.getKey();
TemplateTitleVo templateTitleVo = e.getValue();
if (Objects.isNull(templateTitleVo.getId())) {
templateTitleVo.buildId(group);
}
long updateTime = templateTitleVo.getUpdateTime().getTime();
// 移除7天有效期外的数据
if (System.currentTimeMillis() - updateTime > ONE_DAY * 7 * 1000) {
log.info("{}-移除过期模板标题:{},最后更新时间:{}", group, title, updateTime);
return false;
} else if (e.getValue().getStatus() == TemplateStatus.已重置) {
log.info("已重置的模板从内存中删除,模板title:{}", e.getKey());
return false;
}
return true;
}).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// 新旧模板合并 且更新模板
templateTitleService.setTemplateTitleByProject(group, mergeTemplate(aggregation, templateTitleByProject));
}
/**
* 数据聚合成模板
* @param sourceList 数据集
* @return 模板集
*/
private Map<String, TemplateTitleVo> aggregation(List<CompleteTextMark> sourceList) {
Map<String, TemplateTitleVo> aggregationTitleTagMap = new ConcurrentHashMap<>();
List<String> titles = sourceList.stream().map(CompleteTextMark::getTitle).collect(Collectors.toList());
// 得到聚合集
List<KResult<Integer>> kResult = AggreeBootStarter.getKResult(titles, 0.1);
for (KResult<Integer> result : kResult) {
if (result.getDataPoints().size() < 3) {
continue;
}
// 标签统计
Map<String, Long> tagGroup = result.getDataPoints().stream().map(e -> sourceList.get(e).getMtag())
.collect(Collectors.groupingBy(mtag -> mtag, Collectors.counting()));
//得到数量最多的标签
String tag = tagGroup.entrySet().stream().max(Map.Entry.comparingByValue()).map(Map.Entry::getKey).get();
// 生成模板
String title = Tools.filterSymbol(result.getClusterName());
aggregationTitleTagMap.put(title, new TemplateTitleVo(title, tag, sourceList.get(result.getDataPoints().get(0)).getUrl()));
}
return aggregationTitleTagMap;
}
/**
* 新旧模板合并
* @param oldTemplate 旧模板
* @param newTemplate 新模板
*/
private Map<String, TemplateTitleVo> mergeTemplate(Map<String, TemplateTitleVo> oldTemplate, Map<String, TemplateTitleVo> newTemplate) {
for (Map.Entry<String, TemplateTitleVo> newEntry : newTemplate.entrySet()) {
List<String> templateKeys = oldTemplate.keySet().stream()
.filter(e -> CosineSimilarity.calculateTextSimWithBrand(newEntry.getKey(), e) >= 0.96)
.collect(Collectors.toList());
// 添加模板
if (templateKeys.isEmpty()) {
oldTemplate.put(newEntry.getKey(), newEntry.getValue());
} else {
// 更新标签
for (String oldKey : templateKeys) {
oldTemplate.get(oldKey).setMtag(newEntry.getValue().getMtag());
}
}
}
return oldTemplate;
}
/**
* 转换
* @param sourceMap 数据集
* @return 标准文本集
*/
private List<CompleteTextMark> transferMark(List<Map<String, Object>> sourceMap) {
return sourceMap.stream().map(CompleteTextMark::restoreFromEs).collect(Collectors.toList());
}
/**
* 查询条件
* @return 标注人为自动标注机器人
*/
private QueryBuilder autoRobotQueryBuilder() {
return QueryBuilders.termQuery("mperson", GenericAttribute.AUTO_PERSON);
}
@FunctionalInterface
public interface TaskTemplateFunctional {
void template(JSONObject json);
}
}
......@@ -36,7 +36,8 @@ public class RedissonUtil {
}
public Map<String, String> getMapValue(String key) {
return redissonClient.getMap(redisKey(key));
// return redissonClient.getMap(redisKey(key));
return redissonClient.getMap(key);
}
public String getMapKeyValue(String key, String group) {
......
......@@ -6,14 +6,14 @@ singleServerConfig:
retryAttempts: 3
retryInterval: 1500
subscriptionsPerConnection: 5
address: "redis://192.168.0.225:6379"
address: "redis://115.236.59.91:7386"
subscriptionConnectionMinimumIdleSize: 1
subscriptionConnectionPoolSize: 50
connectionMinimumIdleSize: 32
connectionPoolSize: 64
database: 4
database: 3
dnsMonitoringInterval: 5000
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.JsonJacksonCodec> {}
codec: !<org.redisson.client.codec.StringCodec> {}
transportMode: "NIO"
\ No newline at end of file
......@@ -15,5 +15,5 @@ singleServerConfig:
dnsMonitoringInterval: 5000
threads: 0
nettyThreads: 0
codec: !<org.redisson.codec.JsonJacksonCodec> {}
codec: !<org.redisson.client.codec.StringCodec> {}
transportMode: "NIO"
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment