Commit 401c8e2e by liuyu

2023年3月22日 server - son模式

parent 720a2127
......@@ -3,9 +3,6 @@ package com.zhiwei.middleware.automatic.configuration;
import com.zhiwei.middleware.automatic.server.core.*;
import com.zhiwei.middleware.automatic.server.dubbo.service.AutoMaticService;
import com.zhiwei.middleware.automatic.server.dubbo.service.CommonService;
import com.zhiwei.middleware.automatic.server.dubbo.service.DataCollectionService;
import com.zhiwei.middleware.automatic.server.dubbo.service.DataUploadService;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
......@@ -27,28 +24,4 @@ public class AutoMaticClientConfiguration {
properties.getConsumer()));
}
@Bean
@ConditionalOnMissingBean(CommonClient.class)
public CommonClient commonClient(AutoMaticClientConfigurationProperties properties) {
return new CommonClient(AutoMaticClientFactory.createInstance(CommonService.class, properties.getApplication(),properties.getRegistry(),
properties.getConsumer()));
}
@Bean
@ConditionalOnMissingBean(DataCollectionClient.class)
public DataCollectionClient dataCollectionClient(AutoMaticClientConfigurationProperties properties) {
return new DataCollectionClient(AutoMaticClientFactory.createInstance(DataCollectionService.class, properties.getApplication(),properties.getRegistry(),
properties.getConsumer()));
}
@Bean
@ConditionalOnMissingBean(DataUploadClient.class)
public DataUploadClient dataUploadClient(AutoMaticClientConfigurationProperties properties) {
return new DataUploadClient(AutoMaticClientFactory.createInstance(DataUploadService.class, properties.getApplication(),properties.getRegistry(),
properties.getConsumer()));
}
}
......@@ -19,19 +19,10 @@
<curator.version>2.12.0</curator.version>
<base.version>2.0.0-SNAPSHOT</base.version>
<easyexcel.version>2.1.2</easyexcel.version>
<json.version>1.2.58</json.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${json.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/easyexcel -->
<dependency>
<groupId>com.alibaba</groupId>
......
package com.zhiwei.middleware.automatic.server.common;
public class GenericAttribute {
public static final boolean IS_TEST = false;
public static final String REDIS_QUEUE_ONE_KEY = "autoDataOneQueue";
public static final String REDIS_QUEUE_MULTI_KEY = "autoDataMultiQueue";
public static final String REDIS_MAP_KEY = "autoDataMap";
public static final double SIMILAR_STANDARD = 0.7;
public static final String SON_ID = "sonId";
/**
* 修改模板标签最大处理数据的数量
*/
public static final int POINT_SIZE = 100;
public static final String AUTO_PERSON = "自动化机器人";
public static final long AUTO_CID = 100040002;
public static final String ES_M_TIME = "mtime";
public static final String AUTO_CNAME = "上传标注补充采集";
public static final String ES_M_PERSON = "mperson";
public static final String ES_M_TAG = "mtag";
public static final String ES_TITLE = "title";
public static final String GROUP_PARAM = "group";
public static final String START_PARAM = "startTime";
public static final String END_PARAM = "endTime";
public static final String TEMPLATE_TITLE = "templateTitle";
public static final String FIX_TAG = "fixTag";
public static final String KEY = "task";
}
......@@ -3,6 +3,7 @@ package com.zhiwei.middleware.automatic.server.core;
import com.zhiwei.base.entity.subclass.mark.MarkInfo;
import com.zhiwei.middleware.automatic.server.dubbo.service.AutoMaticService;
import com.zhiwei.middleware.automatic.server.pojo.MarkInfoMulti;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import java.util.List;
import java.util.Map;
......@@ -23,8 +24,16 @@ public class AutoMaticClient {
autoMaticService.autoMarkMulti(infos);
}
public boolean modifyTemplateTitle(String group, String templateTitle, String fixTag) {
return autoMaticService.modifyTemplateTitle(group, templateTitle, fixTag);
public void modifyTemplateTitle(String group, String templateTitle, String fixTag) {
autoMaticService.modifyTemplateTitle(group, templateTitle, fixTag);
}
public void resetTemplate(String group, String templateTitle) {
autoMaticService.resetTemplate(group, templateTitle);
}
public Map<String, TemplateTitleVo> getTemplateTitleByProject(String project) {
return autoMaticService.getTemplateTitleByProject(project);
}
public List<String> getMupdateByTemplateTitle(String group, String templateTitle) {
......@@ -38,8 +47,4 @@ public class AutoMaticClient {
public Map<String, Object> compareWithTemplateTileOL(String project, String title) {
return autoMaticService.compareWithTemplateTileOL(project, title);
}
public boolean resetTemplate(String group, String templateTitle) {
return autoMaticService.resetTemplate(group, templateTitle);
}
}
package com.zhiwei.middleware.automatic.server.core;
import com.zhiwei.middleware.automatic.server.dubbo.service.CommonService;
import com.zhiwei.middleware.automatic.server.pojo.CommonAggreeResult;
import com.zhiwei.middleware.automatic.server.pojo.dto.AggreeDTO;
import java.util.List;
public class CommonClient {
private final CommonService commonService;
public CommonClient(CommonService commonService) {
this.commonService = commonService;
}
public String generateAggreeOrder() {
return commonService.generateAggreeOrder();
}
public boolean appendAggreeOrder(String id, List<AggreeDTO> list) {
return commonService.appendAggreeOrder(id, list);
}
public boolean startAggree(String id) {
return commonService.startAggree(id);
}
public boolean startAggree(String id, double limit) {
return commonService.startAggree(id, limit);
}
public CommonAggreeResult getAggreeResult(String id) {
return commonService.getAggreeResult(id);
}
public CommonAggreeResult getAggreeResult(String id, int page, int pageLimit) {
return commonService.getAggreeResult(id, page, pageLimit);
}
}
package com.zhiwei.middleware.automatic.server.core;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.middleware.automatic.server.dubbo.service.DataCollectionService;
import java.util.List;
import java.util.Map;
public class DataCollectionClient {
private final DataCollectionService dataCollectionService;
public DataCollectionClient(DataCollectionService dataCollectionService) {
this.dataCollectionService = dataCollectionService;
}
public void cleanCache(String group, String id) {
dataCollectionService.cleanCache(group, id);
}
public void cleanCacheExceptNoise(String group, String id) {
dataCollectionService.cleanCacheExceptNoise(group, id);
}
public void addDataCollection(String group, String id, List<String> compressedlist) {
dataCollectionService.addDataCollection(group, id, compressedlist);
}
public void startAggree(String group, String id, String highWords) {
dataCollectionService.startAggree(group, id, highWords);
}
public boolean batchModifyFatherTag(String group, String id, List<String> fatherIds, String mtag, String mperson,
ClassB.TypeB typeB) {
return dataCollectionService.batchModifyFatherTag(group, id, fatherIds, mtag, mperson, typeB);
}
public boolean modifyFatherTag(String group, String id, String fatherId, String mtag, String mperson, ClassB.TypeB typeB) {
return dataCollectionService.modifyFatherTag(group, id, fatherId, mtag, mperson, typeB);
}
public boolean modifySonTag(String group, String id, String fatherId, String sonId, String mtag, String mperson,
ClassB.TypeB typeB) {
return dataCollectionService.modifySonTag(group, id, fatherId, sonId, mtag, mperson, typeB);
}
public boolean throwIntoNoise(String group, String id, String fatherId, ClassB.TypeB typeB) {
return dataCollectionService.throwIntoNoise(group, id, fatherId, typeB);
}
public boolean batchThrowIntoNoise(String group, String id, List<String> fatherIds, ClassB.TypeB typeB) {
return dataCollectionService.batchThrowIntoNoise(group, id, fatherIds, typeB);
}
public boolean restoreFromNoise(String group, String id, String fatherId, ClassB.TypeB typeB) {
return dataCollectionService.restoreFromNoise(group, id, fatherId, typeB);
}
public Map<String, Object> getFatherTitles(String group, String id, int page, int size, boolean isAsc,
String keyword, ClassB.TypeB typeB, boolean isTitle, int markFlag) {
return dataCollectionService.getFatherTitles(group, id, page, size, isAsc, keyword, typeB, isTitle, markFlag);
}
public Map<String, Object> getSonTitles(String group, String id, String fatherId, int page, int size, boolean isAsc,
String keyword, ClassB.TypeB typeB) {
return dataCollectionService.getSonTitles(group, id, fatherId, page, size, isAsc, keyword, typeB);
}
public Map<String, Object> getNoiseFatherTitles(String group, String id, int page, int size, boolean isAsc,
String keyword, ClassB.TypeB typeB, boolean isTitle, int markFlag) {
return dataCollectionService.getNoiseFatherTitles(group, id, page, size, isAsc, keyword, typeB, isTitle, markFlag);
}
public Map<String, Object> getNoiseSonTitles(String group, String id, String fatherId, int page, int size,
boolean isAsc, String keyword, ClassB.TypeB typeB) {
return dataCollectionService.getNoiseSonTitles(group, id, fatherId, page, size, isAsc, keyword, typeB);
}
public void checkedThenInsert(String group, String id) {
dataCollectionService.checkedThenInsert(group, id);
}
public int getAggreResultNow(String group, String id) {
return dataCollectionService.getAggreResultNow(group, id);
}
public int getInsertResultNow(String group, String id) {
return dataCollectionService.getInsertResultNow(group, id);
}
}
package com.zhiwei.middleware.automatic.server.core;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.middleware.automatic.server.dubbo.service.DataUploadService;
import com.zhiwei.middleware.automatic.server.pojo.UploadInfo;
import com.zhiwei.middleware.automatic.server.pojo.enums.InsertType;
import java.util.Map;
public class DataUploadClient {
private final DataUploadService dataUploadService;
public DataUploadClient(DataUploadService dataUploadService) {
this.dataUploadService = dataUploadService;
}
public void addUploadList(String group, String id, String sourceStr) {
dataUploadService.addUploadList(group, id, sourceStr);
}
public void startUpload(String group, String id, String mperson,
UploadInfo.MtagType mtagType, UploadInfo.FilterType filterType, String projectId, InsertType insertType) {
dataUploadService.startUpload(group, id, mperson, mtagType, filterType, projectId, insertType);
}
public Map<String, Object> getUploadStatus(String group, String id) {
return dataUploadService.getUploadStatus(group, id);
}
public Map<String, Object> getUploadInfoList(String group, String id, int page, int size, boolean isAsc,
String searchField, String keyword, UploadInfo.UploadType uploadType) {
return dataUploadService.getUploadInfoList(group, id, page, size, isAsc, searchField, keyword, uploadType);
}
public UploadInfo.DataType getDataType(JSONObject json, ClassB.TypeB typeB) {
return dataUploadService.getDataType(json, typeB);
}
public void cleanUploadResult(String group, String id) {
dataUploadService.cleanUploadResult(group, id);
}
}
......@@ -2,6 +2,7 @@ package com.zhiwei.middleware.automatic.server.dubbo.service;
import com.zhiwei.base.entity.subclass.mark.MarkInfo;
import com.zhiwei.middleware.automatic.server.pojo.MarkInfoMulti;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import java.util.List;
import java.util.Map;
......@@ -18,7 +19,21 @@ public interface AutoMaticService {
* @param templateTitle 模板标题
* @param fixTag 正确的标签
*/
boolean modifyTemplateTitle(String group, String templateTitle, String fixTag);
void modifyTemplateTitle(String group, String templateTitle, String fixTag);
/**
* 重置自动标注模板
* @param group 项目
* @param templateTitle 模板标题
*/
void resetTemplate (String group, String templateTitle);
/**
* 获取项目文本模板
* @param project 项目
* @return 模板集
*/
Map<String, TemplateTitleVo> getTemplateTitleByProject(String project);
/**
* 根据模板标题获取数据(仅最新100条)
......@@ -46,13 +61,5 @@ public interface AutoMaticService {
* @param title 标题
* @return 返回值
*/
public Map<String, Object> compareWithTemplateTileOL(String project, String title);
/**
* 重置自动标注模板
* @param group 项目
* @param templateTitle 模板标题
* @return 是否成功
*/
boolean resetTemplate (String group, String templateTitle);
Map<String, Object> compareWithTemplateTileOL(String project, String title);
}
package com.zhiwei.middleware.automatic.server.dubbo.service;
import com.zhiwei.middleware.automatic.server.pojo.CommonAggreeResult;
import com.zhiwei.middleware.automatic.server.pojo.dto.AggreeDTO;
import java.util.List;
public interface CommonService {
/**
* 获得任务id(新)
*
* @return
*/
String generateAggreeOrder();
/**
* 根据id添加数据new
*
* @param id
* @param list
* @return
*/
boolean appendAggreeOrder(String id, List<AggreeDTO> list);
/**
* k-means二分聚合数据
*
* @param id
* @return
*/
boolean startAggree(String id);
/**
* k-means二分聚合数据
*
* @param id
* @param limit
* @return
*/
boolean startAggree(String id, double limit);
/**
* 获取聚合结果(默认返回第一页)
*
* @param id
* @return
*/
CommonAggreeResult getAggreeResult(String id);
/**
* 获取聚合结果(分页)
*
* @param id
* @param page
* @param pageLimit
* @return
*/
CommonAggreeResult getAggreeResult(String id, int page, int pageLimit);
}
\ No newline at end of file
package com.zhiwei.middleware.automatic.server.dubbo.service;
import com.zhiwei.base.category.ClassB.TypeB;
import java.util.List;
import java.util.Map;
/**
* @ClassName: DataCollectionService
* @Description: 数据采集模块服务
* @author SJJ
* @date 2020年4月7日 下午3:02:05
*/
public interface DataCollectionService {
/**
* 清理全部缓存
*
* @param group
* @param id
*/
public void cleanCache(String group, String id);
/**
* 清理全部缓存(保留噪音集)
*
* @param group
* @param id
*/
public void cleanCacheExceptNoise(String group, String id);
/**
* 添加基础数据集
*
* @param group
* @param id
*/
public void addDataCollection(String group, String id, List<String> compressedlist);
/**
* 启动聚合
*
* @param group
* @param id
*/
public void startAggree(String group, String id, String highWords);
/**
* 批量修改父模板标签(批量修改所属的子标签)
*
* @param group
* @param id
* @param fatherIds
* @param mtag
* @param mperson
* @param typeB
* @return
*/
public boolean batchModifyFatherTag(String group, String id, List<String> fatherIds, String mtag, String mperson,
TypeB typeB);
/**
* 修改父模板标签(批量修改所属的子标签)
*
* @param group
* @param id
* @param fatherId
* @param mtag
* @return
*/
public boolean modifyFatherTag(String group, String id, String fatherId, String mtag, String mperson, TypeB typeB);
/**
* 修改子标签
*
* @param group
* @param id
* @param fatherId
* @param sonId
* @param mtag
* @return
*/
public boolean modifySonTag(String group, String id, String fatherId, String sonId, String mtag, String mperson,
TypeB typeB);
/**
* 纳入噪音集
*
* @param group
* @param id
* @param fatherId
* @return
*/
public boolean throwIntoNoise(String group, String id, String fatherId, TypeB typeB);
/**
* 批量纳入噪音集
*
* @param group
* @param id
* @param fatherId
* @return
*/
public boolean batchThrowIntoNoise(String group, String id, List<String> fatherIds, TypeB typeB);
/**
* 从噪音集还原
*
* @param group
* @param id
* @param fatherId
* @return
*/
public boolean restoreFromNoise(String group, String id, String fatherId, TypeB typeB);
/**
* 分页获取父标题信息集合
*
* @param group
* @param id
* @param page
* @param size
* @param isAsc
* @param keyword
* @return
*/
public Map<String, Object> getFatherTitles(String group, String id, int page, int size, boolean isAsc,
String keyword, TypeB typeB, boolean isTitle, int markFlag);
/**
* 根据父id和子id分页获取子信息集合
*
* @param group
* @param id
* @param fatherId
* @param page
* @param size
* @param isAsc
* @param keyword
* @return
*/
public Map<String, Object> getSonTitles(String group, String id, String fatherId, int page, int size, boolean isAsc,
String keyword, TypeB typeB);
/**
* 分页获取父标题信息噪音集合
*
* @param group
* @param id
* @param page
* @param size
* @param isAsc
* @param keyword
* @return
*/
public Map<String, Object> getNoiseFatherTitles(String group, String id, int page, int size, boolean isAsc,
String keyword, TypeB typeB, boolean isTitle, int markFlag);
/**
* 根据父id分页获取子信息噪音集合
*
* @param group
* @param id
* @param fatherId
* @param page
* @param size
* @param isAsc
* @param keyword
* @return
*/
public Map<String, Object> getNoiseSonTitles(String group, String id, String fatherId, int page, int size,
boolean isAsc, String keyword, TypeB typeB);
/**
* 检查完毕数据入库
*
* @param group
* @param id
*/
public void checkedThenInsert(String group, String id);
/**
* 立刻获取聚合临时结果
*
* @param group
* @param id
* @return -2:获取结果异常;-1:未聚合;0:聚合中:1:已聚合
*/
int getAggreResultNow(String group, String id);
/**
* 立刻获取入库临时结果
*
* @param group
* @param id
* @return -2:获取结果异常;-1:未入库;0:入库中:1:已入库
*/
public int getInsertResultNow(String group, String id);
}
\ No newline at end of file
package com.zhiwei.middleware.automatic.server.dubbo.service;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.category.ClassB.TypeB;
import com.zhiwei.middleware.automatic.server.pojo.UploadInfo;
import com.zhiwei.middleware.automatic.server.pojo.enums.InsertType;
import java.util.Map;
/**
* @ClassName: DataUploadService
* @Description: 数据上传服务
* @author SJJ
* @date 2020年2月25日 下午6:02:26
*/
public interface DataUploadService {
/**
* 添加源数据集
*
* @param group
* @param id
*
* @return Map<String,Object>
*/
public void addUploadList(String group, String id, String sourceStr);
/**
* 启动上传
*
* @param group
* @param id
* @param mperson
*
* @return void
*/
public void startUpload(String group, String id, String mperson,
UploadInfo.MtagType mtagType, UploadInfo.FilterType filterType, String projectId, InsertType insertType);
/**
* 获取上传状态(进度)
*
* @param group
* @param id
*
* @return Map<String,Object>
*/
public Map<String, Object> getUploadStatus(String group, String id);
/**
* 获取UploadType数据集
*
* @param group
* @param id
* @param page
* @param size
* @param isAsc
* @param searchField
* @param keyword
* @param uploadType
* @return
*
* @return Map<String,Object>
*/
public Map<String, Object> getUploadInfoList(String group, String id, int page, int size, boolean isAsc,
String searchField, String keyword, UploadInfo.UploadType uploadType);
/**
* 获取DataType
*
* @param json
* @param typeB
*
* @return DataType
*/
public UploadInfo.DataType getDataType(JSONObject json, TypeB typeB);
/**
* 清理数据集
*
* @param group
* @param id
*
* @return void
*/
public void cleanUploadResult(String group, String id);
}
package com.zhiwei.middleware.automatic.server.pojo;
import com.alibaba.fastjson.JSONObject;
public class AutoTask {
private JSONObject paramSource;
private String type;
public AutoTask() {}
public AutoTask(String type) {
this.type = type;
this.paramSource = new JSONObject();
}
public JSONObject getParamSource() {
return paramSource;
}
public void setParamSource(JSONObject paramSource) {
this.paramSource = paramSource;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}
package com.zhiwei.middleware.automatic.server.pojo;
import org.springframework.data.mongodb.core.mapping.Document;
/**
* @ClassName
* @Description 模板标注信息记录
* @Author ${"liu-yu"}
* @Date 2022/5/6 17:05
**/
@Document("automaticmark_template_record")
public class TemplateRecord {
/**
* id
*/
......
package com.zhiwei.middleware.automatic.server.pojo.vo;
package com.zhiwei.middleware.automatic.server.pojo;
import com.zhiwei.middleware.automatic.server.pojo.enums.TemplateStatus;
......@@ -15,7 +15,7 @@ public class TemplateTitleVo implements Serializable {
private String templateTitle;
private Date updateTime;
private Date createTime;
private AtomicLong markSum;
private Long markSum;
private String mtag;
private String url;
private TemplateStatus status;
......@@ -61,11 +61,11 @@ public class TemplateTitleVo implements Serializable {
this.createTime = createTime;
}
public AtomicLong getMarkSum() {
public Long getMarkSum() {
return markSum;
}
public void setMarkSum(AtomicLong markSum) {
public void setMarkSum(Long markSum) {
this.markSum = markSum;
}
......@@ -93,23 +93,18 @@ public class TemplateTitleVo implements Serializable {
this.status = status;
}
public void emptyNum() {
this.markSum = new AtomicLong(0);
}
public TemplateTitleVo(String templateTitle, String mtag, String url) {
this.updateTime = new Date();
this.createTime = new Date();
this.templateTitle = templateTitle;
this.markSum = new AtomicLong();
this.markSum = 0L;
this.mtag = mtag;
this.url = url;
this.status = TemplateStatus.运行中;
}
public void refreshMark() {
this.getMarkSum().getAndIncrement();
this.updateTime = new Date();
}
}
package com.zhiwei.middleware.automatic.server.pojo.enums;
public enum TaskType {
COMMON_ONE("common_one","common", "commonCache"),
COMMON_TWO("common_two","common", "commonCache"),
TEMPLATE("template", "template", ""),
TEMPLATE_MODIFY("template_modify","template", ""),
TEMPLATE_RESET("template_reset","template", "");
final String type;
final String name;
final String cacheId;
TaskType(String type, String name, String cacheId) {
this.type = type;
this.name = name;
this.cacheId = cacheId;
}
public String getName() {
return this.name;
}
public String getType() {
return this.type;
}
public String getCacheId() {
return cacheId;
}
public static TaskType create(String type) {
for (TaskType taskType : TaskType.values()) {
if (taskType.type.equals(type)) {
return taskType;
}
}
return null;
}
}
......@@ -18,47 +18,36 @@
<json.version>1.2.47</json.version>
<push-log.version>2.17.0-SNAPSHOT</push-log.version>
<curator.version>2.12.0</curator.version>
<es.version>7.9.2</es.version>
<es-client.version>0.0.4-SNAPSHOT</es-client.version>
<filter.version>1.1.6-SNAPSHOT</filter.version>
<qbjc-bean.version>1.1.4.1-SNAPSHOT</qbjc-bean.version>
<nlp-aggree.version>0.0.5-SNAPSHOT</nlp-aggree.version>
<dubbo-server.version>2.7.4.1</dubbo-server.version>
<automatic.version>1.0-SNAPSHOT</automatic.version>
<base.version>2.0.0-SNAPSHOT</base.version>
<marker.version>1.2.3-SNAPSHOT</marker.version>
<kafka.version>2.4.1.RELEASE</kafka.version>
</properties>
<dependencies>
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${kafka.version}</version>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
<version>${spring-boot.version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.zhiwei</groupId>
<artifactId>wechat</artifactId>
<version>1.3.7-SNAPSHOT</version>
</dependency>
<!-- 标注客户端 -->
<dependency>
<groupId>com.zhiwei.middleware</groupId>
<artifactId>marker-client</artifactId>
<version>${marker.version}</version>
</dependency>
<dependency>
<groupId>com.zhiwei.base</groupId>
<artifactId>base-objects-application</artifactId>
<version>${base.version}</version>
......@@ -97,63 +86,12 @@
<dependency>
<groupId>com.zhiwei.nlp</groupId>
<artifactId>nlp-aggree</artifactId>
<version>${nlp-aggree.version}</version>
</dependency>
<!-- 日志依赖使用crawler-filter -->
<dependency>
<groupId>com.zhiwei.middleware</groupId>
<artifactId>cleaner-unified-filter</artifactId>
<version>${filter.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.zhiwei</groupId>
<artifactId>qbjc-bean</artifactId>
<version>${qbjc-bean.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${es.version}</version>
</dependency>
<dependency>
<groupId>com.zhiwei</groupId>
<artifactId>es-client</artifactId>
<version>${es-client.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
<version>${spring-boot.version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
......
package com.zhiwei.middleware.automatic.server.base;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.base.entity.CommonDO;
import com.zhiwei.base.entity.subclass.mark.MarkInfo;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadInfo;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadResult;
import com.zhiwei.middleware.automatic.server.pojo.UploadInfo;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.search.SearchHit;
public interface BaseDataUploadService {
ClassB.TypeB getTypeB();
/**
* 通过文本搜索大库数据
*
* @param info 上传信息
* @return CommonDO
*/
CommonDO searchDwByContentNew(MarkUploadResult info);
/**
* 将上传表格实体转换为数据上传实体
*
* @param info 上传信息
* @return UploadInfo
* @throws Exception
*/
UploadInfo parseMarkUploadInfo2UploadInfo(MarkUploadInfo info)
throws Exception;
/**
* 构建url查询条件
* @param result 标注信息
* @return BoolQueryBuilder
*/
BoolQueryBuilder urlSearchQuery(MarkUploadResult result);
/**
* 构建文本查询条件
* @param result 标注信息
* @return BoolQueryBuilder
*/
BoolQueryBuilder textSearchQuery(MarkUploadResult result);
/**
* es数据转base实体
* @param hit es数据
* @return base实体
*/
CommonDO getCommonDOBySearchHit(SearchHit hit);
/**
* 标注markInfo转换
* @param result 标注结果
* @param mperson 标注人
* @param group 项目
* @param originMtag 标签
* @return MarkInfo
*/
MarkInfo toMarkInfoNew(MarkUploadResult result, String mperson, String group, String... originMtag);
}
package com.zhiwei.middleware.automatic.server.base;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.middleware.automatic.server.functional.*;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadRule;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import java.util.*;
public class BulkTemplate<T> {
private static final Logger log = LogManager.getLogger(BulkTemplate.class);
private List<T> source;
private String state;
private boolean isNext;
private BoolQueryBuilder queryBuilder;
private Set<String> indexSet;
private Set<ClassB.TypeB> typeSet;
public BulkTemplate(List<T> source, String state) {
this.source = source;
this.state = state;
this.isNext = false;
this.indexSet = new HashSet<>();
this.typeSet = new HashSet<>();
this.queryBuilder = QueryBuilders.boolQuery();
}
public void bulkQuery(EsRowQuery<T> esRowQuery, EsIndex<T> esIndex, DataClassType<T> classType, UploadRowException<T> exception) {
for (T t : source) {
try {
this.queryBuilder.should(esRowQuery.rowQuery(t));
if (Objects.nonNull(esIndex)) {
this.indexSet.add(esIndex.getIndex(t));
}
if (Objects.nonNull(classType)) {
this.typeSet.add(classType.getClassType(t));
}
} catch (Exception e) {
exception.rowException(t, state + "-构建查询条件", e.getMessage());
}
}
this.isNext = true;
if (indexSet.isEmpty()) {
this.isNext = false;
log.error("批量操作-构建查询条件阶段 es索引为空");
}
}
public void searchCallback(Map<String, List<SearchHit>> hitMap, MarkUploadRule rule, RowKey<T> rowKey, DataMerge<T> dataMerge, UploadRowException<T> exception) {
if (!isNext) {
return;
}
for (T t : source) {
try {
String key = rowKey.getRowKey(t);
dataMerge.dataMerge(hitMap.get(key), t, rule);
} catch (Exception e) {
exception.rowException(t, state + "-es数据合并", e.getMessage());
}
}
}
public Set<String> getIndexSet() {
return indexSet;
}
public Set<ClassB.TypeB> getTypeSet() {
return typeSet;
}
public BoolQueryBuilder getQueryBuilder() {
return queryBuilder;
}
public List<T> getSource() {
return source;
}
public void clean(List<T> source, String state) {
this.source = source;
this.state = state;
this.isNext = false;
this.indexSet = new HashSet<>();
this.typeSet = new HashSet<>();
this.queryBuilder = QueryBuilders.boolQuery();
}
}
package com.zhiwei.middleware.automatic.server.base;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.ValueFilter;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.base.entity.CommonDO;
import com.zhiwei.base.entity.subclass.mark.*;
import com.zhiwei.middleware.automatic.server.config.GenericAttribute;
import com.zhiwei.middleware.automatic.server.dubbo.handle.DubboHandler;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadResult;
import com.zhiwei.middleware.automatic.server.pojo.UploadInfo;
import com.zhiwei.middleware.automatic.server.util.Tools;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import java.util.ArrayList;
import java.util.List;
public class DataUploadCommon {
public ClassB.TypeB getTypeB() {
return typeB;
}
public Class<? extends CommonDO> getDwClazz() {
return dwClazz;
}
public Class<? extends CommonDO> getMarkClazz() {
return markClazz;
}
public DubboHandler getDubboHandler() {
return dubboHandler;
}
private final ClassB.TypeB typeB;
private final Class<? extends CommonDO> dwClazz;
private final Class<? extends CommonDO> markClazz;
private final DubboHandler dubboHandler;
public DataUploadCommon(ClassB.TypeB typeB, Class<? extends CommonDO> dwClazz, Class<? extends CommonDO> markClazz,
DubboHandler dubboHandler) {
this.typeB = typeB;
this.dwClazz = dwClazz;
this.markClazz = markClazz;
this.dubboHandler = dubboHandler;
}
/**
* 获取数据类型
*
* @param info
* @return DataType
*/
public final UploadInfo.DataType getDataType(UploadInfo info) throws Exception {
UploadInfo.DataType dataType = UploadInfo.DataType.EXTERNAL;
// 标注库是否存在
if (dubboHandler.contains(info.getCompound().getMark().filterInfo())) {
dataType = UploadInfo.DataType.MARK;
// 舆情库是否存在
} else if (dubboHandler.contains(info.getCompound().getDw().filterInfo())) {
dataType = UploadInfo.DataType.DW;
}
info.setDataType(dataType);
return dataType;
}
/**
* 转换为markCommon
*
* @param dw
* @param mgroup
* @return CommonDO
*/
public final CommonDO convert2Mark(CommonDO dw, String mgroup) {
JSONObject json = dw.toJSON();
json.put(GenericAttribute.ES_M_GROUP, mgroup);
return JSONObject.parseObject(json.toJSONString(), markClazz);
}
public static BoolQueryBuilder urlQuery(String url, String urlName) {
List<String> urls = new ArrayList<>(2);
urls.add(url);
if (url.contains("https:")) {
urls.add(url.replaceFirst("https", "http"));
} else if (url.contains("http")) {
urls.add(url.replaceFirst("http", "https"));
}
if (url.contains("toutiao.com")) {
String pattern = "[\\d]+";
List<String> result = Tools.patternMatchFind(url, pattern);
if (!result.isEmpty()) {
String toutiaoSuffix = result.get(0);
url = "https://www.toutiao.com/a" + toutiaoSuffix;
urls.add(url);
}
}
BoolQueryBuilder bool = QueryBuilders.boolQuery();
urls.forEach(value -> {
bool.should(QueryBuilders.termQuery(urlName, value));
});
return bool;
}
/**
* 补充可能缺失的必要字段
*
* @param commonDO
* @param mperson
* @return CommonDO
*/
public CommonDO addDefault(CommonDO commonDO, String mperson, String mgroup, String originTag,
String mtag, Class<? extends CommonDO> clazz) {
JSONObject json = commonDO.toJSON();
// 未有ctime,cid,cname作补充
if (null == json.get(GenericAttribute.ES_C_TIME)) {
json.put(GenericAttribute.ES_C_TIME, System.currentTimeMillis());
}
Long cid = json.getLong(GenericAttribute.ES_CID);
if (null == cid || -1 == cid) {
json.put(GenericAttribute.ES_CID, GenericAttribute.ES_CID_DEFAULT);
}
if (!json.containsKey(GenericAttribute.ES_C_NAME)) {
json.put(GenericAttribute.ES_C_NAME, GenericAttribute.AUTO_CNAME);
}
json.put(GenericAttribute.ES_M_GROUP, mgroup);
json.put(GenericAttribute.ES_M_PERSON, mperson);
json.put(GenericAttribute.ES_M_TAG, Tools.partialUpdateTag(originTag, mtag));
ValueFilter filter = (o, n, v) -> {
if ("".equals(v)) {
return null;
}
return v;
};
return JSONObject.parseObject(JSON.toJSONString(json, filter), clazz);
}
}
package com.zhiwei.middleware.automatic.server.base;
public class FieldErrorException extends Exception {
private static final long serialVersionUID = 6671756541874479047L;
public FieldErrorException(String msg) {
super(msg);
}
}
package com.zhiwei.middleware.automatic.server.base.impl;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.base.entity.CommonDO;
import com.zhiwei.base.entity.subclass.CompleteText;
import com.zhiwei.base.entity.subclass.mark.CompleteTextMark;
import com.zhiwei.base.entity.subclass.mark.MarkInfo;
import com.zhiwei.middleware.automatic.server.base.BaseDataUploadService;
import com.zhiwei.middleware.automatic.server.base.DataUploadCommon;
import com.zhiwei.middleware.automatic.server.base.FieldErrorException;
import com.zhiwei.middleware.automatic.server.config.GenericAttribute;
import com.zhiwei.middleware.automatic.server.dao.EsDao;
import com.zhiwei.middleware.automatic.server.dubbo.handle.DubboHandler;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadInfo;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadResult;
import com.zhiwei.middleware.automatic.server.pojo.UploadInfo;
import com.zhiwei.middleware.automatic.server.util.DataUploadUtil;
import com.zhiwei.middleware.automatic.server.util.TimeUtil;
import com.zhiwei.middleware.automatic.server.util.Tools;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
public class CompleteTextServiceImpl extends DataUploadCommon implements BaseDataUploadService {
private static final Logger log = LogManager.getLogger(CompleteTextServiceImpl.class);
private final EsDao esDao;
public CompleteTextServiceImpl(DubboHandler dubboHandler, EsDao esDao) {
super(ClassB.TypeB.COMPLETE, CompleteText.class, CompleteTextMark.class, dubboHandler);
this.esDao = esDao;
}
@Override
public CommonDO searchDwByContentNew(MarkUploadResult info) {
CommonDO res = null;
// 还原数据
CompleteText dw = (CompleteText) info.getDw();
// 文本去重需要的精确到分的时间以及host
String ruleTime = TimeUtil.CONTENT_DF.format(dw.getTime());
String host = Tools.getHost(dw.getUrl());
BoolQueryBuilder bool = QueryBuilders.boolQuery();
bool.must(QueryBuilders.termQuery(GenericAttribute.ES_SOURCE, dw.getSource()));
List<Map<String, Object>> allResults = null;
try {
allResults = Arrays.stream(esDao.search(TimeUtil.getAccurateIndex(dw.getTime(), getTypeB(), false), bool, null, null, 0, 1000, null).getHits())
.map(SearchHit::getSourceAsMap).collect(Collectors.toList());
} catch (IOException e) {
log.error("es文本搜索失败:", e);
return res;
}
boolean matched = false;
for (Map<String, Object> map : allResults) {
try {
CompleteText text = CompleteText.restoreFromEs(map);
// 任一条件不匹配
if (ruleTime.equals(TimeUtil.CONTENT_DF.format(text.getTime())) && host.equals(Tools.getHost(text.getUrl()))) {
matched = true;
res = text;
break;
}
} catch (Exception e) {
log.info("debug-esMap:{}", JSONObject.toJSONString(map));
}
}
if (!matched) {
// 文本匹配任未找到
log.info("文本匹配任未找到!title:{},source:{},time:{},host:{}", dw.getTitle(), dw.getSource(), ruleTime, host);
}
return res;
}
@Override
public UploadInfo parseMarkUploadInfo2UploadInfo(MarkUploadInfo info) throws Exception {
CompleteTextMark mark = JSONObject.parseObject(JSONObject.toJSONString(info), CompleteTextMark.class);
if (null == mark.getC5()) {
DataUploadUtil.defaultCTypeAll(mark, info);
}
if (!Tools.isLegalTime(mark.getTime())) {
throw new FieldErrorException("time字段不符合规则");
}
// 重置userId
mark.setUserId(info.getUid());
try {
String[] mupdates = getDubboHandler().getMupdates(mark.filterInfo());
// 设置标注特征字段
mark.setMupdate(mupdates[0]);
if (mupdates.length == 2) {
mark.setMupdateTwo(mupdates[1]);
}
} catch (Exception e) {
log.error("parseMarkUploadInfo2UploadInfo-getMupdates",e);
throw new FieldErrorException(e.getMessage());
}
CompleteText dw = JSONObject.parseObject(mark.toJSON().toJSONString(), CompleteText.class);
return new UploadInfo(info, new UploadInfo.CompoundCommonDO(dw, mark), getTypeB());
}
@Override
public BoolQueryBuilder urlSearchQuery(MarkUploadResult result) {
return urlQuery(result.getOriginData().getUrl(), GenericAttribute.ES_URL);
}
@Override
public BoolQueryBuilder textSearchQuery(MarkUploadResult result) {
CompleteTextMark mark = (CompleteTextMark) result.getMark();
BoolQueryBuilder bool = QueryBuilders.boolQuery();
bool.must(QueryBuilders.termQuery(GenericAttribute.ES_M_GROUP, mark.getMgroup()));
bool.must(urlQuery(mark.getUrl(), GenericAttribute.ES_URL));
result.setKey(mark.getMgroup() + Tools.urlReplace(mark.getUrl()));
return bool;
}
@Override
public CommonDO getCommonDOBySearchHit(SearchHit hit) {
return CommonDO.restoreFromEs(hit.getSourceAsMap(), getDwClazz());
}
@Override
public MarkInfo toMarkInfoNew(MarkUploadResult result, String mperson, String group, String... originMtag) {
String originTag = originMtag.length > 0 ? originMtag[0] : null;
return new MarkInfo((CompleteTextMark) addDefault(result.getMark(), mperson, group,
originTag, result.getOriginData().getMtag(), CompleteTextMark.class));
}
}
package com.zhiwei.middleware.automatic.server.base.impl;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.base.entity.CommonDO;
import com.zhiwei.base.entity.subclass.IncompleteText;
import com.zhiwei.base.entity.subclass.mark.IncompleteTextMark;
import com.zhiwei.base.entity.subclass.mark.MarkInfo;
import com.zhiwei.middleware.automatic.server.base.BaseDataUploadService;
import com.zhiwei.middleware.automatic.server.base.DataUploadCommon;
import com.zhiwei.middleware.automatic.server.base.FieldErrorException;
import com.zhiwei.middleware.automatic.server.config.GenericAttribute;
import com.zhiwei.middleware.automatic.server.dubbo.handle.DubboHandler;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadInfo;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadResult;
import com.zhiwei.middleware.automatic.server.pojo.UploadInfo;
import com.zhiwei.middleware.automatic.server.util.DataUploadUtil;
import com.zhiwei.middleware.automatic.server.util.Tools;
import com.zhiwei.middleware.automatic.server.util.WeiboMidUrlDealUtil;
import io.micrometer.core.instrument.util.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.springframework.stereotype.Service;
import java.util.Objects;
@Service
public class IncompleteTextServiceImpl extends DataUploadCommon implements BaseDataUploadService {
private static final Logger log = LogManager.getLogger(IncompleteTextServiceImpl.class);
private final DubboHandler dubboHandler;
public IncompleteTextServiceImpl(DubboHandler dubboHandler) {
super(ClassB.TypeB.INCOMPLETE, IncompleteText.class, IncompleteTextMark.class, dubboHandler);
this.dubboHandler = dubboHandler;
}
@Override
public CommonDO searchDwByContentNew(MarkUploadResult info) {
return null;
}
@Override
public UploadInfo parseMarkUploadInfo2UploadInfo(MarkUploadInfo info) throws Exception {
if (null == info.getMgroup()) {
// TODO 测试打印
log.info("出现mgroup为空数据,data:{}", JSONObject.toJSONString(info));
}
IncompleteTextMark mark = JSONObject.parseObject(JSONObject.toJSONString(info),
IncompleteTextMark.class);
// 若没有c1-c5字段则自动补全
if (null == mark.getC5()) {
DataUploadUtil.defaultCTypeAll(mark, info);
}
if (!Tools.isLegalTime(mark.getTime())) {
throw new FieldErrorException("time字段不符合规则");
}
// 文本为空-选用标题数据
if (StringUtils.isEmpty(mark.getContent())) {
mark.setContent(info.getTitle());
}
// 粉丝数
if (null != info.getFans()) {
mark.setFollowersNum(Integer.valueOf(info.getFans()));
}
// 还原认证类型
String vtype = info.getAuthenticationType();
// 微博必须要有vtype
if (null != vtype) {
mark.setVtype(restoreVtype(vtype));
}
// 是否原发(默认值:原创)
if (StringUtils.isEmpty(info.getPrimary())) {
mark.setIsForward(false);
} else {
mark.setIsForward(!"原创".equals(info.getPrimary()));
}
// source也为screenName
mark.setScreenName(info.getSource());
// rootSource意为rootScreenName
mark.setRootScreenName(info.getRootSource());
if ("微博".equals(info.getPlatform())) {
// 去重信息需要携带c4
mark.setC4(1020);
if (null == mark.getMid()) {
String mid = WeiboMidUrlDealUtil.urlToMid(mark.getUrl());
if (null == mid) {
throw new FieldErrorException("转换mid出错");
} else {
mark.setMid(mid);
}
}
}
try {
String[] mupdates = dubboHandler.getMupdates(mark.filterInfo());
// 设置标注特征字段
mark.setMupdate(mupdates[0]);
if (mupdates.length == 2) {
mark.setMupdateTwo(mupdates[1]);
}
} catch (Exception e) {
log.error("parseMarkUploadInfo2UploadInfo-getMupdates",e);
throw new FieldErrorException(e.getMessage());
}
IncompleteText dw = JSONObject.parseObject(mark.toJSON().toJSONString(), IncompleteText.class);
return new UploadInfo(info, new UploadInfo.CompoundCommonDO(dw, mark), getTypeB());
}
@Override
public BoolQueryBuilder urlSearchQuery(MarkUploadResult result) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (Objects.nonNull(result.getOriginData().getMid())) {
boolQueryBuilder.should(QueryBuilders.termQuery(GenericAttribute.ES_MID, result.getOriginData().getMid()));
}
return boolQueryBuilder.should(urlQuery(result.getOriginData().getUrl(), GenericAttribute.ES_URL));
}
@Override
public BoolQueryBuilder textSearchQuery(MarkUploadResult result) {
// 还原数据
IncompleteTextMark mark = (IncompleteTextMark) result.getMark();
BoolQueryBuilder bool = QueryBuilders.boolQuery();
bool.must(QueryBuilders.termQuery(GenericAttribute.ES_M_GROUP, mark.getMgroup()));
if (Objects.nonNull(mark.getMid())) {
bool.must(QueryBuilders.termQuery(GenericAttribute.ES_MID, mark.getMid()));
}
bool.must(urlQuery(mark.getUrl(), GenericAttribute.ES_URL));
result.setKey(mark.getMgroup() + Tools.urlReplace(mark.getUrl()));
return bool;
}
@Override
public CommonDO getCommonDOBySearchHit(SearchHit hit) {
return CommonDO.restoreFromEs(hit.getSourceAsMap(), getDwClazz());
}
@Override
public MarkInfo toMarkInfoNew(MarkUploadResult result, String mperson, String group, String... originMtag) {
String originTag = originMtag.length > 0 ? originMtag[0] : null;
return new MarkInfo((IncompleteTextMark) addDefault(result.getMark(), mperson, group,
originTag, result.getOriginData().getMtag(), IncompleteTextMark.class));
}
/**
*
* 根据微博规则还原认证类型
*
* @param vtypeStr
*
* @return int
*/
private int restoreVtype(String vtypeStr) {
if (null == vtypeStr) {
throw new IllegalArgumentException("微博必须要有vtype!!!");
}
switch (vtypeStr) {
case "未知":
return -2;
case "普通用户":
return -1;
case "名人":
return 0;
case "政府":
return 1;
case "企业":
return 2;
case "媒体":
return 3;
case "校园":
return 4;
case "网站":
return 5;
case "应用":
return 6;
case "团体":
return 7;
case "微博女郎":
return 10;
default:
// 其中 "达人" 对应200和220,返回默认值
// 默认返回-2(未知)
return -2;
}
}
}
package com.zhiwei.middleware.automatic.server.base.impl;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.base.entity.CommonDO;
import com.zhiwei.base.entity.subclass.QAText;
import com.zhiwei.base.entity.subclass.mark.MarkInfo;
import com.zhiwei.base.entity.subclass.mark.QATextMark;
import com.zhiwei.middleware.automatic.server.base.BaseDataUploadService;
import com.zhiwei.middleware.automatic.server.base.DataUploadCommon;
import com.zhiwei.middleware.automatic.server.base.FieldErrorException;
import com.zhiwei.middleware.automatic.server.config.GenericAttribute;
import com.zhiwei.middleware.automatic.server.dubbo.handle.DubboHandler;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadInfo;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadResult;
import com.zhiwei.middleware.automatic.server.pojo.UploadInfo;
import com.zhiwei.middleware.automatic.server.util.DataUploadUtil;
import com.zhiwei.middleware.automatic.server.util.TimeUtil;
import com.zhiwei.middleware.automatic.server.util.Tools;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.springframework.stereotype.Service;
@Service
public class QATextServiceImpl extends DataUploadCommon implements BaseDataUploadService {
private static final Logger log = LogManager.getLogger(QATextServiceImpl.class);
private final DubboHandler dubboHandler;
public QATextServiceImpl(DubboHandler dubboHandler) {
super(ClassB.TypeB.QA, QAText.class, QATextMark.class, dubboHandler);
this.dubboHandler = dubboHandler;
}
@Override
public CommonDO searchDwByContentNew(MarkUploadResult info) {
return null;
}
@Override
public UploadInfo parseMarkUploadInfo2UploadInfo(MarkUploadInfo info) throws Exception {
JSONObject json = JSONObject.parseObject(JSONObject.toJSONString(info));
String url = info.getUrl();
String title = info.getTitle();
String content = info.getContent();
String source = info.getSource();
Long time = TimeUtil.TIME_FORMAT.parse(info.getTime()).getTime();
// 论坛数据以questionUrl title为准
json.put("questionTitle", title);
json.put("questionUrl", url);
// 做简单问答判断
if ("www.zhihu.com".equals(Tools.getHost(url)) && !url.contains("answer")) {
json.put("questionTime", time);
json.put("questionUsername", source);
json.put("questionContent", content);
} else {
json.put("answerTime", time);
json.put("answerUrl", url);
json.put("answerUsername",source);
json.put("answerContent", content);
}
QATextMark mark = JSONObject.parseObject(json.toJSONString(), QATextMark.class);
// 若没有c1-c5字段则自动补全
if (null == mark.getC5()) {
DataUploadUtil.defaultCTypeAll(mark, info);
}
if (!Tools.isLegalTime(mark.getTime())) {
throw new FieldErrorException("time字段不符合规则");
}
try {
String[] mupdates = dubboHandler.getMupdates(mark.filterInfo());
// 设置标注特征字段
mark.setMupdate(mupdates[0]);
if (mupdates.length == 2) {
mark.setMupdateTwo(mupdates[1]);
}
} catch (Exception e) {
log.error("parseMarkUploadInfo2UploadInfo-getMupdates",e);
throw new FieldErrorException(e.getMessage());
}
QAText dw = JSONObject.parseObject(mark.toJSON().toJSONString(), QAText.class);
return new UploadInfo(info, new UploadInfo.CompoundCommonDO(dw, mark), getTypeB());
}
@Override
public BoolQueryBuilder urlSearchQuery(MarkUploadResult result) {
BoolQueryBuilder should = QueryBuilders.boolQuery().should(urlQuery(result.getOriginData().getUrl(), GenericAttribute.ES_QA_QUESTION_URL))
.should(urlQuery(result.getOriginData().getUrl(), GenericAttribute.ES_QA_ANSWER_URL));
return QueryBuilders.boolQuery().must(QueryBuilders.termQuery(GenericAttribute.ES_SOURCE, result.getOriginData().getSource())).must(should);
}
@Override
public BoolQueryBuilder textSearchQuery(MarkUploadResult result) {
QATextMark mark = (QATextMark) result.getMark();
BoolQueryBuilder bool = QueryBuilders.boolQuery();
bool.must(QueryBuilders.termQuery(GenericAttribute.ES_M_GROUP, mark.getMgroup()));
BoolQueryBuilder urlQuery = QueryBuilders.boolQuery();
BoolQueryBuilder qaUrl = QueryBuilders.boolQuery()
.must(urlQuery(mark.getQuestionUrl(), GenericAttribute.ES_QA_QUESTION_URL))
.must(QueryBuilders.existsQuery(GenericAttribute.ES_QA_ANSWER_URL));
urlQuery.should(qaUrl);
urlQuery.should(urlQuery(mark.getQuestionUrl(), GenericAttribute.ES_QA_ANSWER_URL));
bool.must(urlQuery);
result.setKey(mark.getMgroup() + Tools.urlReplace(result.getOriginData().getUrl()));
return bool;
}
@Override
public CommonDO getCommonDOBySearchHit(SearchHit hit) {
return CommonDO.restoreFromEs(hit.getSourceAsMap(), getDwClazz());
}
@Override
public MarkInfo toMarkInfoNew(MarkUploadResult result, String mperson, String group, String... originMtag) {
String originTag = originMtag.length > 0 ? originMtag[0] : null;
return new MarkInfo((QATextMark) addDefault(result.getMark(), mperson, group,
originTag, result.getOriginData().getMtag(), QATextMark.class));
}
}
package com.zhiwei.middleware.automatic.server.base.impl;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.base.entity.CommonDO;
import com.zhiwei.base.entity.subclass.Video;
import com.zhiwei.base.entity.subclass.mark.MarkInfo;
import com.zhiwei.base.entity.subclass.mark.VideoMark;
import com.zhiwei.middleware.automatic.server.base.BaseDataUploadService;
import com.zhiwei.middleware.automatic.server.base.DataUploadCommon;
import com.zhiwei.middleware.automatic.server.base.FieldErrorException;
import com.zhiwei.middleware.automatic.server.config.GenericAttribute;
import com.zhiwei.middleware.automatic.server.dao.EsDao;
import com.zhiwei.middleware.automatic.server.dubbo.handle.DubboHandler;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadInfo;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadResult;
import com.zhiwei.middleware.automatic.server.pojo.UploadInfo;
import com.zhiwei.middleware.automatic.server.util.DataUploadUtil;
import com.zhiwei.middleware.automatic.server.util.TimeUtil;
import com.zhiwei.middleware.automatic.server.util.Tools;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
public class VideoServiceImpl extends DataUploadCommon implements BaseDataUploadService {
private static final Logger log = LogManager.getLogger(VideoServiceImpl.class);
private final DubboHandler dubboHandler;
private final EsDao esDao;
public VideoServiceImpl(DubboHandler dubboHandler, EsDao esDao) {
super(ClassB.TypeB.VIDEO, Video.class, VideoMark.class, dubboHandler);
this.dubboHandler = dubboHandler;
this.esDao = esDao;
}
@Override
public CommonDO searchDwByContentNew(MarkUploadResult info) {
CommonDO res = null;
// 还原数据
Video dw = (Video) info.getDw();
// 查询条件
BoolQueryBuilder bool = QueryBuilders.boolQuery();
bool.must(QueryBuilders.termQuery(GenericAttribute.ES_SOURCE, dw.getSource()));
// 文本去重需要的精确到分的时间以及host
String ruleTime = TimeUtil.CONTENT_DF.format(dw.getTime());
String host = Tools.getHost(dw.getUrl());
List<Map<String, Object>> allResults = null;
try {
allResults = Arrays.stream(esDao.search(TimeUtil.getAccurateIndex(dw.getTime(), getTypeB(), false), bool, null, null, 0, 1000, null).getHits())
.map(SearchHit::getSourceAsMap).collect(Collectors.toList());
} catch (IOException e) {
log.error("es文本搜索失败:", e);
return res;
}
boolean matched = false;
for (Map<String, Object> map : allResults) {
Video text = Video.restoreFromEs(map);
// 任一条件不匹配
if (ruleTime.equals(TimeUtil.CONTENT_DF.format(text.getTime())) && host.equals(Tools.getHost(text.getUrl()))) {
matched = true;
res = text;
break;
}
}
if (!matched) {
// 文本匹配任未找到
log.info("文本匹配任未找到!title:{},source:{},time:{},host:{}", dw.getTitle(), dw.getSource(), ruleTime, host);
}
return res;
}
@Override
public UploadInfo parseMarkUploadInfo2UploadInfo(MarkUploadInfo info) throws Exception {
VideoMark mark = JSONObject.parseObject(JSONObject.toJSONString(info), VideoMark.class);
// 若没有c1-c5字段则自动补全
if (null == mark.getC5()) {
DataUploadUtil.defaultCTypeAll(mark, info);
}
if (!Tools.isLegalTime(mark.getTime())) {
throw new FieldErrorException("time字段不符合规则");
}
try {
String[] mupdates = dubboHandler.getMupdates(mark.filterInfo());
// 设置标注特征字段
mark.setMupdate(mupdates[0]);
if (mupdates.length == 2) {
mark.setMupdateTwo(mupdates[1]);
}
} catch (Exception e) {
log.error("parseMarkUploadInfo2UploadInfo-getMupdates",e);
throw new FieldErrorException(e.getMessage());
}
VideoMark dw = JSONObject.parseObject(mark.toJSON().toJSONString(), VideoMark.class);
return new UploadInfo(info, new UploadInfo.CompoundCommonDO(dw, mark), getTypeB());
}
@Override
public BoolQueryBuilder urlSearchQuery(MarkUploadResult result) {
return QueryBuilders.boolQuery().must(urlQuery(result.getOriginData().getUrl(), GenericAttribute.ES_URL));
}
@Override
public BoolQueryBuilder textSearchQuery(MarkUploadResult result) {
// 还原数据
VideoMark mark = (VideoMark) result.getMark();
BoolQueryBuilder bool = QueryBuilders.boolQuery();
bool.must(QueryBuilders.termQuery(GenericAttribute.ES_M_GROUP, mark.getMgroup()));
bool.must(urlQuery(mark.getUrl(), GenericAttribute.ES_URL));
result.setKey(mark.getMgroup() + Tools.urlReplace(mark.getUrl()));
return bool;
}
@Override
public CommonDO getCommonDOBySearchHit(SearchHit hit) {
return CommonDO.restoreFromEs(hit.getSourceAsMap(), this.getDwClazz());
}
@Override
public MarkInfo toMarkInfoNew(MarkUploadResult result, String mperson, String group, String... originMtag) {
String originTag = originMtag.length > 0 ? originMtag[0] : null;
return new MarkInfo((VideoMark) addDefault(result.getMark(), mperson, group,
originTag, result.getOriginData().getMtag(), VideoMark.class));
}
}
package com.zhiwei.middleware.automatic.server.config;
public class GenericAttribute {
public static final boolean IS_TEST = false;
public static final String UNIFIED_PREFIX = "dataUpload";
public static final String SOURCE_DATA = "sourceData";
public static final String FORMAT_ERROR_SUFFIX = "formatError";
public static final String FIELD_ERROR_SUFFIX = "fieldError";
public static final String SYSTEM_ERROR_SUFFIX = "fieldError";
public static final String SUCCESS_SUFFIX = "successed";
public static final String FAILED_SUFFIX = "failed";
public static final String STATUS_SUFFIX = "status";
public static final String REDIS_PREFIX = "DATA-COLLECTION";
public static final String SOURCE = "SOURCE";
public static final String STATUS = "STATUS";
public static final String NOISE = "NOISE";
public static final String MAP_SET = "|MAP_SET";
public static final String KEY_SET = "|KEY_SET";
public static final String NOISE_SET = "|NOISE_SET";
public static final String HIT_WORD_RATE = "hitWordAndRate";
public static final double SIMILAR_STANDARD_NOISE = 0.8;
public static final String KEY_INCREMENT = "increment";
public static final String REDIS_QUEUE_ONE_KEY = "autoDataOneQueue";
public static final String REDIS_QUEUE_MULTI_KEY = "autoDataMultiQueue";
public static final String REDIS_MAP_KEY = "autoDataMap";
public static final int REDIS_QUEUE_LIMIT = 1000;
public static final double SIMILAR_STANDARD = 0.7;
public static final String SON_ID = "sonId";
/**
* 修改模板标签最大处理数据的数量
*/
public static final int POINT_SIZE = 100;
public static final String AUTO_PERSON = "自动化机器人";
public static final long AUTO_CID = 100040002;
public static final String LOCK_TEMPLATE_HOUR = "lock:template:hour";
public static final String LOCK_TEMPLATE_DAY = "lock:template:day";
public static final String LOCK_TEMPLATE_NUMBER = "lock:template:number";
public static final String ES_C_TIME = "ctime";
public static final String ES_M_TIME = "mtime";
public static final String ES_CID = "cid";
public static final long ES_CID_DEFAULT = 100040002L;
public static final String ES_C_NAME = "cname";
public static final String AUTO_CNAME = "上传标注补充采集";
public static final String ES_M_GROUP = "mgroup";
public static final String ES_M_PERSON = "mperson";
public static final String ES_M_TAG = "mtag";
public static final String ES_URL = "url";
public static final String ES_MID = "mid";
public static final String ES_QA_QUESTION_URL = "question_url";
public static final String ES_QA_ANSWER_URL = "answer_url";
public static final String ES_SOURCE = "source";
public static final String ES_TITLE = "title";
public static final String ES_CONTENT = "content";
}
......@@ -2,7 +2,6 @@ package com.zhiwei.middleware.automatic.server.config;
import com.mongodb.ConnectionString;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
......@@ -20,15 +19,10 @@ import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
import java.util.concurrent.TimeUnit;
/**
* @ClassName
* @Description TODO
* @Author ${"liu-yu"}
* @Date 2022/12/21 18:01
**/
@Configuration
public class MongoConfig {
@Value("${mongo.connectTimeout}")
private int connectTimeout;
@Value("${mongo.maxWaitTime}")
......@@ -91,4 +85,5 @@ public class MongoConfig {
converter.setTypeMapper(new DefaultMongoTypeMapper(null));
return new MongoTemplate(mongoDbHangZhouFactory(), converter);
}
}
......@@ -9,24 +9,6 @@ import java.util.concurrent.ThreadPoolExecutor;
@Component
public class TaskPoolConfig {
@Bean("autMarkExecutor")
public ThreadPoolTaskExecutor autMarkExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(5);
// 配置最大线程数
executor.setMaxPoolSize(10);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("autoMark-executor-");
executor.setQueueCapacity(20);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
@Bean("asyncExecutor")
public ThreadPoolTaskExecutor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
......@@ -44,76 +26,4 @@ public class TaskPoolConfig {
executor.initialize();
return executor;
}
@Bean("aggreeNoiseExecutor")
public ThreadPoolTaskExecutor aggreeNoiseExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(32);
// 配置最大线程数
executor.setMaxPoolSize(64);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("aggree-noise-executor-");
executor.setQueueCapacity(50);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
@Bean("aggreeExecutor")
public ThreadPoolTaskExecutor aggreeExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(5);
// 配置最大线程数
executor.setMaxPoolSize(10);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("aggree-executor-");
executor.setQueueCapacity(50);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
@Bean("eventAggreeEasyExecutor")
public ThreadPoolTaskExecutor eventAggreeEasyExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(6);
// 配置最大线程数
executor.setMaxPoolSize(8);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("event-easy-aggree-executor-");
executor.setQueueCapacity(20);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
@Bean("eventAggreeExecutor")
public ThreadPoolTaskExecutor eventAggreeExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(60);
// 配置最大线程数
executor.setMaxPoolSize(100);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("event-aggree-executor-");
executor.setQueueCapacity(50);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
}
......@@ -14,12 +14,6 @@ public interface TemplateRecordDao {
*/
List<TemplateRecord> findTemplateRecord (Query query);
/**
* 新增模板记录
* @param templateRecord 模板记录
*/
void insertTemplateRecord (TemplateRecord templateRecord);
/**
* 查询模板记录数量
......@@ -27,10 +21,4 @@ public interface TemplateRecordDao {
* @return 声量
*/
long count(Query query);
/**
* 根据插件删除模板记录
* @param query 条件
*/
void removeTemplateRecord (Query query);
}
package com.zhiwei.middleware.automatic.server.dao.impl;
import com.zhiwei.middleware.automatic.server.dao.TemplateRecordDao;
import com.zhiwei.middleware.automatic.server.pojo.TemplateNum;
import com.zhiwei.middleware.automatic.server.pojo.TemplateRecord;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
......@@ -21,21 +20,11 @@ public class TemplateRecordDaoImpl implements TemplateRecordDao {
@Override
public List<TemplateRecord> findTemplateRecord(Query query) {
return mongoTemplate.find(query, TemplateRecord.class);
}
@Override
public void insertTemplateRecord(TemplateRecord templateRecord) {
mongoTemplate.insert(templateRecord);
return mongoTemplate.find(query, TemplateRecord.class, "automaticmark_template_record");
}
@Override
public long count(Query query) {
return mongoTemplate.count(query, TemplateRecord.class);
}
@Override
public void removeTemplateRecord(Query query) {
mongoTemplate.remove(query, TemplateRecord.class);
return mongoTemplate.count(query, TemplateRecord.class, "automaticmark_template_record");
}
}
......@@ -2,6 +2,7 @@ package com.zhiwei.middleware.automatic.server.dubbo.service;
import com.zhiwei.base.entity.subclass.mark.MarkInfo;
import com.zhiwei.middleware.automatic.server.pojo.MarkInfoMulti;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import java.util.List;
import java.util.Map;
......@@ -13,6 +14,7 @@ public interface AutoMaticService {
void autoMarkMulti(List<MarkInfoMulti> markInfoMultis);
/**
* 修正模板标题的markTag 如果不存在就会增加
*
......@@ -20,7 +22,22 @@ public interface AutoMaticService {
* @param templateTitle 模板标题
* @param fixTag 正确的标签
*/
boolean modifyTemplateTitle(String group, String templateTitle, String fixTag);
void modifyTemplateTitle(String group, String templateTitle, String fixTag);
/**
* 重置自动标注模板
* @param group 项目
* @param templateTitle 模板标题
*/
void resetTemplate (String group, String templateTitle);
/**
* 获取项目文本模板
* @param project 项目
* @return 模板集
*/
Map<String, TemplateTitleVo> getTemplateTitleByProject(String project);
/**
* 根据模板标题获取数据(仅最新100条)
......@@ -48,13 +65,6 @@ public interface AutoMaticService {
* @param title 标题
* @return 返回值
*/
public Map<String, Object> compareWithTemplateTileOL(String project, String title);
Map<String, Object> compareWithTemplateTileOL(String project, String title);
/**
* 重置自动标注模板
* @param group 项目
* @param templateTitle 模板标题
* @return 是否成功
*/
boolean resetTemplate (String group, String templateTitle);
}
package com.zhiwei.middleware.automatic.server.dubbo.service;
import com.zhiwei.middleware.automatic.server.pojo.CommonAggreeResult;
import com.zhiwei.middleware.automatic.server.pojo.dto.AggreeDTO;
import java.util.List;
public interface CommonService {
/**
* 获得任务id(新)
*
* @return
*/
String generateAggreeOrder();
/**
* 根据id添加数据new
*
* @param id
* @param list
* @return
*/
boolean appendAggreeOrder(String id, List<AggreeDTO> list);
/**
* k-means二分聚合数据
*
* @param id
* @return
*/
boolean startAggree(String id);
/**
* k-means二分聚合数据
*
* @param id
* @param limit
* @return
*/
boolean startAggree(String id, double limit);
/**
* 获取聚合结果(默认返回第一页)
*
* @param id
* @return
*/
CommonAggreeResult getAggreeResult(String id);
/**
* 获取聚合结果(分页)
*
* @param id
* @param page
* @param pageLimit
* @return
*/
CommonAggreeResult getAggreeResult(String id, int page, int pageLimit);
}
package com.zhiwei.middleware.automatic.server.dubbo.service;
import com.zhiwei.base.category.ClassB;
import java.util.List;
import java.util.Map;
public interface DataCollectionService {
/**
* 清理全部缓存
*
* @param group 项目
* @param id id
*/
void cleanCache(String group, String id);
/**
* 清理全部缓存(保留噪音集)
*
* @param group 项目
* @param id id
*/
void cleanCacheExceptNoise(String group, String id);
/**
* 添加基础数据集
*
* @param group 项目
* @param id id
* @param compressedList 数据集
*/
void addDataCollection(String group, String id, List<String> compressedList);
/**
* 启动聚合
*
* @param group 项目
* @param id id
*/
void startAggree(String group, String id, String highWords);
/**
* 批量修改父模板标签(批量修改所属的子标签)
*
* @param group 项目
* @param id id
* @param fatherIds 父级id
* @param mtag 标签
* @param mperson 标注人
* @param typeB typeB
* @return 是否成功
*/
boolean batchModifyFatherTag(String group, String id, List<String> fatherIds, String mtag, String mperson,
ClassB.TypeB typeB);
/**
* 修改父模板标签(批量修改所属的子标签)
*
* @param group
* @param id
* @param fatherId
* @param mtag
* @return
*/
boolean modifyFatherTag(String group, String id, String fatherId, String mtag, String mperson, ClassB.TypeB typeB);
/**
* 修改子标签
*
* @param group
* @param id
* @param fatherId
* @param sonId
* @param mtag
* @return
*/
boolean modifySonTag(String group, String id, String fatherId, String sonId, String mtag, String mperson,
ClassB.TypeB typeB);
/**
* 纳入噪音集
*
* @param group
* @param id
* @param fatherId
* @return
*/
boolean throwIntoNoise(String group, String id, String fatherId, ClassB.TypeB typeB);
/**
* 批量纳入噪音集
*
* @param group
* @param id
* @return
*/
boolean batchThrowIntoNoise(String group, String id, List<String> fatherIds, ClassB.TypeB typeB);
/**
* 从噪音集还原
*
* @param group
* @param id
* @param fatherId
* @return
*/
boolean restoreFromNoise(String group, String id, String fatherId, ClassB.TypeB typeB);
/**
* 分页获取父标题信息集合
*
* @param group
* @param id
* @param page
* @param size
* @param isAsc
* @param keyword
* @return
*/
Map<String, Object> getFatherTitles(String group, String id, int page, int size, boolean isAsc,
String keyword, ClassB.TypeB typeB, boolean isTitle, int markFlag);
/**
* 根据父id和子id分页获取子信息集合
*
* @param group
* @param id
* @param fatherId
* @param page
* @param size
* @param isAsc
* @param keyword
* @return
*/
Map<String, Object> getSonTitles(String group, String id, String fatherId, int page, int size, boolean isAsc,
String keyword, ClassB.TypeB typeB);
/**
* 分页获取父标题信息噪音集合
*
* @param group
* @param id
* @param page
* @param size
* @param isAsc
* @param keyword
* @return
*/
Map<String, Object> getNoiseFatherTitles(String group, String id, int page, int size, boolean isAsc,
String keyword, ClassB.TypeB typeB, boolean isTitle, int markFlag);
/**
* 根据父id分页获取子信息噪音集合
*
* @param group
* @param id
* @param fatherId
* @param page
* @param size
* @param isAsc
* @param keyword
* @return
*/
Map<String, Object> getNoiseSonTitles(String group, String id, String fatherId, int page, int size,
boolean isAsc, String keyword, ClassB.TypeB typeB);
/**
* 检查完毕数据入库
*
* @param group
* @param id
*/
void checkedThenInsert(String group, String id);
/**
* 立刻获取聚合临时结果
*
* @param group
* @param id
* @return -2:获取结果异常;-1:未聚合;0:聚合中:1:已聚合
*/
int getAggreResultNow(String group, String id);
/**
* 立刻获取入库临时结果
*
* @param group
* @param id
* @return -2:获取结果异常;-1:未入库;0:入库中:1:已入库
*/
int getInsertResultNow(String group, String id);
}
package com.zhiwei.middleware.automatic.server.dubbo.service;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.middleware.automatic.server.pojo.UploadInfo;
import com.zhiwei.middleware.automatic.server.pojo.enums.InsertType;
import java.util.Map;
public interface DataUploadService {
/**
* 添加源数据集
*
* @param group 项目
* @param id 任务id
*/
void addUploadList(String group, String id, String sourceStr);
/**
* 启动上传
*
* @param group 项目
* @param id 任务
* @param mperson 提交人
* @return void
*/
void startUpload(String group, String id, String mperson,
UploadInfo.MtagType mtagType, UploadInfo.FilterType filterType, String projectId, InsertType insertType);
/**
* 获取上传状态(进度)
*
* @param group 项目
* @param id 任务id
*
* @return Map<String,Object>
*/
Map<String, Object> getUploadStatus(String group, String id);
/**
* 获取UploadType数据集
*
* @param group 项目
* @param id 任务id
* @param page page
* @param size size
* @param isAsc 排序
* @param searchField 查询字段
* @param keyword 关键字
* @param uploadType 上传类型
* @return
*
* @return Map<String,Object>
*/
Map<String, Object> getUploadInfoList(String group, String id, int page, int size, boolean isAsc,
String searchField, String keyword, UploadInfo.UploadType uploadType);
/**
* 获取DataType
*
* @param json
* @param typeB
*
* @return DataType
*/
UploadInfo.DataType getDataType(JSONObject json, ClassB.TypeB typeB);
/**
* 清理数据集
*
* @param group
* @param id
*
* @return void
*/
void cleanUploadResult(String group, String id);
}
package com.zhiwei.middleware.automatic.server.dubbo.service;
import java.util.Map;
public interface EventCollectionMarkService {
/**
* 增加事件采集聚合源数据集
*
* @param sourceList
* @return
*/
void addEventCollectionAggreSourceList(String group, String id, String sourceStr);
/**
* 清理事件采集聚合结果集
*/
void cleanEventCollectionAggreData(String group, String id);
/**
* 获取事件采集聚合结果
*
* @return
*/
Map<String, Object> getEventCollectionAggreTemplate(String group, String id, int page, int size, boolean isAsc,
int markFlag, String keyword);
/**
* 修改插件聚合模板标题的标签
*
* @param group
* @param id
* @param modifyTag
* @return
*/
boolean modifyEventCollectionAggreTitleMarkTag(String group, String id, String templateTitle, String modifyTag);
/**
* 根据模板标题获取父标题标注信息markTag
*
* @param group
* @param id
* @param templateTitle
* @return
*/
String getEventCollectionMarkTagByTemplate(String group, String id, String templateTitle);
/**
* 根据模板标题获取子标题集合
*
* @param group
* @param id
* @param templateTitle
* @return
*/
Map<String, Object> getEventCollectionAggreSubTitle(String group, String id, String templateTitle);
/**
* 启动聚合
*
* @param group
* @param id
*/
public void startAggre(String group, String id);
/**
* 已标注事件采集入库
*
* @param group
* @param id
*/
public boolean eventCollectionMarkedInsert(String group, String id, int markSum);
/**
* 已标注事件采集入库
*
* @param group
* @param id
*/
public boolean eventCollectionMarkedInsert(String group, String id, int markSum, String mperson);
/**
* 清理全部结果(聚合集+噪音集)
*/
void cleanEventCollectionAllData(String group, String id);
/**
* 获取事件采集噪音父标题集
*
* @param group
* @param id
* @param page
* @param size
* @param isAsc
* @return
*
* @return Map<String,Object>
*/
Map<String, Object> getEventCollectionNoiseTitles(String group, String id, int page, int size, boolean isAsc,
String keyword);
/**
*
* 获取事件采集噪音子集
*
* @param group
* @param id
* @param templateTitle
* @return
*
* @return Map<String,Object>
*/
Map<String, Object> getEventCollectionNoiseSubTitle(String group, String id, String templateTitle);
/**
*
* 标注部分是否已入库
*
* @param group
* @param id
* @return
*
* @return boolean
*/
boolean markedHasInserted(String group, String id);
}
......@@ -2,15 +2,20 @@ package com.zhiwei.middleware.automatic.server.dubbo.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.entity.subclass.mark.MarkInfo;
import com.zhiwei.middleware.automatic.server.config.GenericAttribute;
import com.zhiwei.middleware.automatic.server.common.GenericAttribute;
import com.zhiwei.middleware.automatic.server.dubbo.service.AutoMaticService;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.server.pojo.MarkInfoMulti;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import com.zhiwei.middleware.automatic.server.redis.RedissonUtil;
import com.zhiwei.middleware.automatic.server.service.TemplateTitleService;
import com.zhiwei.middleware.automatic.server.util.Tools;
import org.apache.dubbo.config.annotation.Service;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@Service
......@@ -27,17 +32,44 @@ public class AutoMaticServiceImpl implements AutoMaticService {
@Override
public void autoMark(List<MarkInfo> markInfos) {
redissonUtil.putQueue(GenericAttribute.REDIS_QUEUE_ONE_KEY, markInfos.stream().map(JSONObject::toJSONString).collect(Collectors.toList()));
AutoTask autoTask = new AutoTask(TaskType.COMMON_ONE.getType());
String sourceKey = Tools.assembleKey(GenericAttribute.REDIS_QUEUE_ONE_KEY, Tools.randomUUID());
redissonUtil.setList(sourceKey, markInfos.stream()
.filter(e -> Objects.nonNull(e) && Objects.nonNull(e.getSourceObj()))
.map(JSONObject::toJSONString).collect(Collectors.toList()));
autoTask.getParamSource().put(TaskType.COMMON_ONE.getCacheId(), sourceKey);
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask));
}
@Override
public void autoMarkMulti(List<MarkInfoMulti> markInfoMultis) {
redissonUtil.putQueue(GenericAttribute.REDIS_QUEUE_MULTI_KEY, markInfoMultis.stream().map(JSONObject::toJSONString).collect(Collectors.toList()));
AutoTask autoTask = new AutoTask(TaskType.COMMON_TWO.getType());
String sourceKey = Tools.assembleKey(GenericAttribute.REDIS_QUEUE_MULTI_KEY, Tools.randomUUID());
redissonUtil.setList(sourceKey, markInfoMultis.stream().map(JSONObject::toJSONString).collect(Collectors.toList()));
autoTask.getParamSource().put(TaskType.COMMON_TWO.getCacheId(), sourceKey);
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask));
}
@Override
public boolean modifyTemplateTitle(String group, String templateTitle, String fixTag) {
return templateTitleService.modifyTemplateTitle(group, templateTitle, fixTag);
public void modifyTemplateTitle(String group, String templateTitle, String fixTag) {
AutoTask autoTask = new AutoTask(TaskType.TEMPLATE_MODIFY.getType());
autoTask.getParamSource().put(GenericAttribute.GROUP_PARAM, group);
autoTask.getParamSource().put(GenericAttribute.TEMPLATE_TITLE, templateTitle);
autoTask.getParamSource().put(GenericAttribute.FIX_TAG, fixTag);
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask));
}
@Override
public void resetTemplate(String group, String templateTitle) {
AutoTask autoTask = new AutoTask(TaskType.TEMPLATE_RESET.getType());
autoTask.getParamSource().put(GenericAttribute.GROUP_PARAM, group);
autoTask.getParamSource().put(GenericAttribute.TEMPLATE_TITLE, templateTitle);
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask));
}
@Override
public Map<String, TemplateTitleVo> getTemplateTitleByProject(String project) {
return templateTitleService.getTemplateTitleByProject(project);
}
@Override
......@@ -54,9 +86,4 @@ public class AutoMaticServiceImpl implements AutoMaticService {
public Map<String, Object> compareWithTemplateTileOL(String project, String title) {
return templateTitleService.compareWithTemplateTileOL(project, title);
}
@Override
public boolean resetTemplate(String group, String templateTitle) {
return templateTitleService.resetTemplate(group, templateTitle);
}
}
package com.zhiwei.middleware.automatic.server.dubbo.service.impl;
import com.zhiwei.middleware.automatic.server.dubbo.service.CommonService;
import com.zhiwei.middleware.automatic.server.pojo.CommonAggreeCache;
import com.zhiwei.middleware.automatic.server.pojo.CommonAggreeResult;
import com.zhiwei.middleware.automatic.server.pojo.PageData;
import com.zhiwei.middleware.automatic.server.pojo.Status;
import com.zhiwei.middleware.automatic.server.pojo.dto.AggreeDTO;
import com.zhiwei.middleware.automatic.server.service.handler.TextHandlerService;
import org.springframework.stereotype.Service;
import com.zhiwei.middleware.automatic.server.pojo.CommonAggreeResult.ResultInfo;
import java.util.List;
@Service
public class CommonServiceImpl implements CommonService {
// 每次返回量
private static final int PAGE_SIZE = 5000;
private final TextHandlerService textHandler;
public CommonServiceImpl(TextHandlerService textHandler) {
this.textHandler = textHandler;
}
@Override
public String generateAggreeOrder() {
return textHandler.generateAggreeOrder();
}
@Override
public boolean appendAggreeOrder(String id, List<AggreeDTO> list) {
return textHandler.appendAggreeOrderNew(id, list);
}
@Override
public boolean startAggree(String id) {
return textHandler.startAggree(id);
}
@Override
public boolean startAggree(String id, double limit) {
return textHandler.startAggree(id, limit);
}
@Override
public CommonAggreeResult getAggreeResult(String id) {
return getAggreeResult(id, 0, PAGE_SIZE);
}
@Override
public CommonAggreeResult getAggreeResult(String id, int page, int pageLimit) {
CommonAggreeCache cache = textHandler.getAggreeResult(id);
// 错误状态(页数异常||未查询到对应任务)
if (page <= 0 || null == cache) {
return new CommonAggreeResult(Status.ERROR);
}
// 正在聚合状态
List<ResultInfo> result = cache.getResults();
if (result.isEmpty()) {
return new CommonAggreeResult(Status.RUN);
}
int total = result.size();
int start = pageLimit * (page - 1);
int end = Math.min(start + pageLimit, total);
int totalPage = (total + pageLimit - 1) / pageLimit;
if (start > total) {
// 超出总量范畴
return new CommonAggreeResult(Status.ERROR);
}
PageData<ResultInfo> results = new PageData<>(page, result.size(), totalPage, pageLimit,
result.subList(start, end));
return new CommonAggreeResult(Status.END, results);
}
}
package com.zhiwei.middleware.automatic.server.dubbo.service.impl;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.middleware.automatic.server.dubbo.service.DataCollectionService;
import com.zhiwei.middleware.automatic.server.service.impl.DataCollection;
import org.apache.dubbo.config.annotation.Service;
import java.util.List;
import java.util.Map;
@Service
public class DataCollectionServiceImpl implements DataCollectionService {
private final DataCollection dataCollection;
public DataCollectionServiceImpl(DataCollection dataCollection) {
this.dataCollection = dataCollection;
}
@Override
public void cleanCache(String group, String id) {
dataCollection.cleanCache(group, id);
}
@Override
public void cleanCacheExceptNoise(String group, String id) {
dataCollection.cleanCacheExceptNoise(group, id);
}
@Override
public void addDataCollection(String group, String id, List<String> compressedList) {
dataCollection.addDataCollection(group, id, compressedList);
}
@Override
public void startAggree(String group, String id, String highWords) {
dataCollection.startAggree(group, id, highWords);
}
@Override
public boolean batchModifyFatherTag(String group, String id, List<String> fatherIds, String mtag, String mperson, ClassB.TypeB typeB) {
return dataCollection.batchModifyFatherTag(group, id, fatherIds, mtag, mperson, typeB);
}
@Override
public boolean modifyFatherTag(String group, String id, String fatherId, String mtag, String mperson, ClassB.TypeB typeB) {
return dataCollection.modifyFatherTag(group, id, fatherId, mtag, mperson, typeB);
}
@Override
public boolean modifySonTag(String group, String id, String fatherId, String sonId, String mtag, String mperson, ClassB.TypeB typeB) {
return dataCollection.modifySonTag(group, id, fatherId, sonId, mtag, mperson, typeB);
}
@Override
public boolean throwIntoNoise(String group, String id, String fatherId, ClassB.TypeB typeB) {
return dataCollection.throwIntoNoise(group, id, fatherId, typeB);
}
@Override
public boolean batchThrowIntoNoise(String group, String id, List<String> fatherIds, ClassB.TypeB typeB) {
return dataCollection.batchThrowIntoNoise(group, id, fatherIds, typeB);
}
@Override
public boolean restoreFromNoise(String group, String id, String fatherId, ClassB.TypeB typeB) {
return dataCollection.restoreFromNoise(group, id, fatherId, typeB);
}
@Override
public Map<String, Object> getFatherTitles(String group, String id, int page, int size, boolean isAsc, String keyword, ClassB.TypeB typeB, boolean isTitle, int markFlag) {
return dataCollection.getFatherTitles(group, id, page, size, isAsc, keyword, typeB, isTitle, markFlag);
}
@Override
public Map<String, Object> getSonTitles(String group, String id, String fatherId, int page, int size, boolean isAsc, String keyword, ClassB.TypeB typeB) {
return dataCollection.getSonTitles(group, id, fatherId, page, size, isAsc, keyword, typeB);
}
@Override
public Map<String, Object> getNoiseFatherTitles(String group, String id, int page, int size, boolean isAsc, String keyword, ClassB.TypeB typeB, boolean isTitle, int markFlag) {
return dataCollection.getNoiseFatherTitles(group, id, page, size, isAsc, keyword, typeB, isTitle, markFlag);
}
@Override
public Map<String, Object> getNoiseSonTitles(String group, String id, String fatherId, int page, int size, boolean isAsc, String keyword, ClassB.TypeB typeB) {
return dataCollection.getNoiseSonTitles(group, id, fatherId, page, size, isAsc, keyword, typeB);
}
@Override
public void checkedThenInsert(String group, String id) {
dataCollection.checkedThenInsert(group, id);
}
@Override
public int getAggreResultNow(String group, String id) {
return dataCollection.getAggreResultNow(group, id);
}
@Override
public int getInsertResultNow(String group, String id) {
return dataCollection.getInsertResultNow(group, id);
}
}
package com.zhiwei.middleware.automatic.server.dubbo.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.middleware.automatic.server.dubbo.service.DataUploadService;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadRule;
import com.zhiwei.middleware.automatic.server.pojo.UploadInfo;
import com.zhiwei.middleware.automatic.server.pojo.enums.InsertType;
import com.zhiwei.middleware.automatic.server.service.UploadService;
import org.apache.dubbo.config.annotation.Service;
import java.util.Map;
@Service
public class DataUploadServiceImpl implements DataUploadService {
private final UploadService uploadService;
public DataUploadServiceImpl (UploadService uploadService) {
this.uploadService = uploadService;
}
@Override
public void addUploadList(String group, String id, String sourceStr) {
uploadService.addUploadList(group, id, sourceStr);
}
@Override
public void startUpload(String group, String id, String mperson, UploadInfo.MtagType mtagType, UploadInfo.FilterType filterType, String projectId, InsertType insertType) {
uploadService.startUpload(new MarkUploadRule(id, group, mperson, mtagType, filterType, projectId, insertType));
}
@Override
public Map<String, Object> getUploadStatus(String group, String id) {
return uploadService.getUploadStatus(group, id);
}
@Override
public Map<String, Object> getUploadInfoList(String group, String id, int page, int size, boolean isAsc, String searchField, String keyword, UploadInfo.UploadType uploadType) {
return uploadService.getUploadInfoList(group, id, page, size, isAsc, searchField, keyword, uploadType);
}
@Override
public UploadInfo.DataType getDataType(JSONObject json, ClassB.TypeB typeB) {
return uploadService.getDataType(json, typeB);
}
@Override
public void cleanUploadResult(String group, String id) {
uploadService.cleanUploadResult(group, id);
}
}
package com.zhiwei.middleware.automatic.server.dubbo.service.impl;
import com.zhiwei.middleware.automatic.server.dubbo.service.EventCollectionMarkService;
import com.zhiwei.middleware.automatic.server.service.impl.EventCollectionMark;
import org.apache.dubbo.config.annotation.Service;
import java.util.Map;
@Service
public class EventCollectionMarkServiceImpl implements EventCollectionMarkService {
private final EventCollectionMark eventCollectionMark;
public EventCollectionMarkServiceImpl(EventCollectionMark eventCollectionMark) {
this.eventCollectionMark = eventCollectionMark;
}
@Override
public void addEventCollectionAggreSourceList(String group, String id, String sourceStr) {
eventCollectionMark.addEventCollectionAggreeSourceList(group, id, sourceStr);
}
@Override
public void cleanEventCollectionAggreData(String group, String id) {
eventCollectionMark.cleanEventCollectionAggreeData(group, id);
}
@Override
public Map<String, Object> getEventCollectionAggreTemplate(String group, String id, int page, int size, boolean isAsc, int markFlag, String keyword) {
return eventCollectionMark.getEventCollectionAggreeTemplate(group, id, page, size, isAsc, markFlag, keyword);
}
@Override
public boolean modifyEventCollectionAggreTitleMarkTag(String group, String id, String templateTitle, String modifyTag) {
return eventCollectionMark.modifyEventCollectionAggreeTitleMarkTag(group, id, templateTitle, modifyTag);
}
@Override
public String getEventCollectionMarkTagByTemplate(String group, String id, String templateTitle) {
return eventCollectionMark.getEventCollectionMarkTagByTemplate(group, id, templateTitle);
}
@Override
public Map<String, Object> getEventCollectionAggreSubTitle(String group, String id, String templateTitle) {
return eventCollectionMark.getEventCollectionAggreeSubTitle(group, id, templateTitle);
}
@Override
public void startAggre(String group, String id) {
eventCollectionMark.startAggree(group, id);
}
@Override
public boolean eventCollectionMarkedInsert(String group, String id, int markSum) {
return eventCollectionMark.eventCollectionMarkedInsert(group, id, markSum);
}
@Override
public boolean eventCollectionMarkedInsert(String group, String id, int markSum, String mperson) {
return eventCollectionMark.eventCollectionMarkedInsert(group, id, markSum, mperson);
}
@Override
public void cleanEventCollectionAllData(String group, String id) {
eventCollectionMark.cleanEventCollectionAllData(group, id);
}
@Override
public Map<String, Object> getEventCollectionNoiseTitles(String group, String id, int page, int size, boolean isAsc, String keyword) {
return eventCollectionMark.getEventCollectionNoiseTitles(group, id, page, size, isAsc, keyword);
}
@Override
public Map<String, Object> getEventCollectionNoiseSubTitle(String group, String id, String templateTitle) {
return eventCollectionMark.getEventCollectionNoiseSubTitle(group, id, templateTitle);
}
@Override
public boolean markedHasInserted(String group, String id) {
return eventCollectionMark.markedHasInserted(group, id);
}
}
package com.zhiwei.middleware.automatic.server.functional;
import com.zhiwei.base.category.ClassB;
@FunctionalInterface
public interface DataClassType<T> {
ClassB.TypeB getClassType(T t);
}
package com.zhiwei.middleware.automatic.server.functional;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadRule;
import org.elasticsearch.search.SearchHit;
import java.util.List;
@FunctionalInterface
public interface DataMerge<T> {
void dataMerge(List<SearchHit> hit, T t, MarkUploadRule rule);
}
package com.zhiwei.middleware.automatic.server.functional;
@FunctionalInterface
public interface EsIndex<T> {
String getIndex(T t);
}
package com.zhiwei.middleware.automatic.server.functional;
import org.elasticsearch.index.query.BoolQueryBuilder;
@FunctionalInterface
public interface EsRowQuery<T> {
BoolQueryBuilder rowQuery(T t);
}
package com.zhiwei.middleware.automatic.server.functional;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.base.entity.CommonDO;
import com.zhiwei.middleware.automatic.server.base.BaseDataUploadService;
import com.zhiwei.middleware.automatic.server.base.DataUploadCommon;
import com.zhiwei.middleware.automatic.server.config.GenericAttribute;
import com.zhiwei.middleware.automatic.server.listener.BaseServiceContext;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadResult;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadRule;
import com.zhiwei.middleware.automatic.server.util.TimeUtil;
import com.zhiwei.middleware.automatic.server.util.Tools;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.search.SearchHit;
import java.util.List;
import java.util.Objects;
public class FunctionalImpl {
private static final Logger log = LogManager.getLogger(FunctionalImpl.class);
public void rowException(MarkUploadResult result, String stage, String message) {
if (Objects.nonNull(result)) {
result.setInfo(GenericAttribute.SYSTEM_ERROR_SUFFIX, stage + ":" + message);
}
}
/**
* 标注上传:url查询条件
* @param result 上传DTO
* @return BoolQueryBuilder
*/
public BoolQueryBuilder urlSearchQuery(MarkUploadResult result) {
return BaseServiceContext.getInstance().getDataUploadService(result.getTypeB()).urlSearchQuery(result);
}
/**
* 标注上传:文本查询条件
* @param result 上传DTO
* @return BoolQueryBuilder
*/
public BoolQueryBuilder textSearchQuery(MarkUploadResult result) {
return BaseServiceContext.getInstance().getDataUploadService(result.getTypeB()).textSearchQuery(result);
}
/**
* 获取大库es index
* @param result 上传DTO
* @return es index
*/
public String getDwIndex(MarkUploadResult result) {
return Objects.nonNull(result.getDw()) ? TimeUtil.getDwIndex(result.getDw().getTime()) : TimeUtil.getDwIndex(result.getOriginData().getTime());
}
/**
* 获取标注库es index
* @param result 上传DTO
* @return es index
*/
public String getMarkIndex(MarkUploadResult result) {
return Objects.nonNull(result.getDw()) ? TimeUtil.getMarkIndex(result.getDw().getTime()) : TimeUtil.getMarkIndex(result.getOriginData().getTime());
}
/**
* 获取TypeB
* @param result 上传DTO
* @return TypeB
*/
public ClassB.TypeB getTypeB(MarkUploadResult result) {
return result.getTypeB();
}
/**
* 获取文本搜索key
* @param result 上传DTO
* @return key
*/
public String getTextSearchRowKey(MarkUploadResult result) {
return Tools.urlReplace(result.getOriginData().getUrl());
}
/**
* 获取标注数据源key
* @param result 上传DTO
* @return key
*/
public String markHandleRowKey(MarkUploadResult result) {
return result.getKey();
}
/**
* url搜索 数据合并
* @param hits es数据
* @param result 上传DTO
*/
public void searchHitMerge(List<SearchHit> hits, MarkUploadResult result, MarkUploadRule rule) {
if (Objects.nonNull(hits)) {
BaseDataUploadService dataUploadService = BaseServiceContext.getInstance().getDataUploadService(result.getTypeB());
CommonDO commonDO = dataUploadService.getCommonDOBySearchHit(hits.get(0));
DataUploadCommon dataUploadCommon = (DataUploadCommon) dataUploadService;
result.setCommonDO(commonDO, dataUploadCommon.convert2Mark(commonDO, rule.getGroup()));
result.setSearch(true);
} else {
result.setInfo(GenericAttribute.SYSTEM_ERROR_SUFFIX, "格式转换失败且大库中不存在该数据");
}
}
/**
* 大库数据源搜索 数据合并
* @param hits es数据
* @param result 上传DTO
*/
public void dwSearchHitMerge(List<SearchHit> hits, MarkUploadResult result, MarkUploadRule rule) {
boolean isNull = Objects.isNull(hits);
BaseDataUploadService dataUploadService = BaseServiceContext.getInstance().getDataUploadService(result.getTypeB());
CommonDO commonDO = result.isSearch() ? result.getMark() : (isNull) ? dataUploadService.searchDwByContentNew(result) : dataUploadService.getCommonDOBySearchHit(hits.get(0));
if (null == commonDO) {
result.setInfo(GenericAttribute.SYSTEM_ERROR_SUFFIX, "数据类型:【dw-content数据】;上传结果:【失败】,二次文本搜索任未搜索到数据");
return;
}
result.setMarkInfo(dataUploadService.toMarkInfoNew(result, rule.getMperson(), rule.getGroup()));
}
/**
* 标注数据源搜索 数据合并
* @param hits es数据
* @param result 上传DTO
*/
public void markSearchHitMerge(List<SearchHit> hits, MarkUploadResult result, MarkUploadRule rule) {
if (Objects.nonNull(hits)) {
try {
BaseDataUploadService dataUploadService = BaseServiceContext.getInstance().getDataUploadService(result.getTypeB());
CommonDO commonDO = dataUploadService.getCommonDOBySearchHit(hits.get(0));
switch (rule.getMtagType()) {
case INDEX:
result.setMarkInfo(dataUploadService.toMarkInfoNew(result, rule.getMperson(), rule.getGroup()));
case UPDATE:
result.setMarkInfo(dataUploadService.toMarkInfoNew(result, rule.getMperson(), rule.getGroup(), commonDO.toJSON().get("mtag") + ""));
}
} catch (Exception e) {
log.error("UploadShell-标注库数据源处理失败:", e);
result.setInfo(GenericAttribute.SYSTEM_ERROR_SUFFIX, "markHandle处理异常");
}
} else {
result.setInfo(GenericAttribute.SYSTEM_ERROR_SUFFIX, "数据类型:【标注数据】;上传结果:【失败】,原因:标注库未找到对应数据");
}
}
}
package com.zhiwei.middleware.automatic.server.functional;
@FunctionalInterface
public interface RowKey<T> {
String getRowKey(T t);
}
package com.zhiwei.middleware.automatic.server.functional;
@FunctionalInterface
public interface UploadRowException<T> {
void rowException(T t, String state, String message);
}
package com.zhiwei.middleware.automatic.server.listener;
import com.zhiwei.base.category.ClassB.TypeB;
import com.zhiwei.middleware.automatic.server.base.BaseDataUploadService;
import org.springframework.context.ApplicationContext;
import java.util.HashMap;
import java.util.Map;
/**
* 基础服务单例类
*/
public class BaseServiceContext {
private final Map<TypeB, BaseDataUploadService> handlerMap = new HashMap<>();
public BaseServiceContext() {
ApplicationContext applicationContext = ApplicationContextHolder.getInstance();
Map<String, BaseDataUploadService> beansOfType = applicationContext.getBeansOfType(BaseDataUploadService.class);
beansOfType.forEach((key, value) -> handlerMap.put(value.getTypeB(), value));
}
public BaseDataUploadService getDataUploadService(TypeB typeB) {
return handlerMap.get(typeB);
}
public static BaseServiceContext getInstance() {
return BaseServiceContextHolder.BASE_SERVICE;
}
private static class BaseServiceContextHolder {
private static final BaseServiceContext BASE_SERVICE = new BaseServiceContext();
}
}
package com.zhiwei.middleware.automatic.server.mission;
import com.zhiwei.middleware.automatic.server.service.AutoService;
import com.zhiwei.qbjc.bean.pojo.common.Project;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
......@@ -14,19 +13,12 @@ public class AsyncTask {
private final MongoTemplate hangZhouMongo;
private final AutoService autoService;
public AsyncTask(@Qualifier("hangzhouMongoTemplate") MongoTemplate hangZhouMongo,
AutoService autoService) {
public AsyncTask(@Qualifier("hangzhouMongoTemplate") MongoTemplate hangZhouMongo) {
this.hangZhouMongo = hangZhouMongo;
this.autoService = autoService;
}
public List<String> findAllGroup() {
return hangZhouMongo.findAll(Project.class).stream().map(Project::getProjectName).collect(Collectors.toList());
}
public void queueDataPull() {
autoService.asyncAutoMark();
}
}
package com.zhiwei.middleware.automatic.server.mission;
import com.zhiwei.middleware.automatic.server.config.GenericAttribute;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.middleware.automatic.server.common.GenericAttribute;
import com.zhiwei.middleware.automatic.server.pojo.AutoTask;
import com.zhiwei.middleware.automatic.server.pojo.enums.TaskType;
import com.zhiwei.middleware.automatic.server.redis.RedissonUtil;
import com.zhiwei.middleware.automatic.server.service.TemplateTitleService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.scheduling.annotation.Async;
......@@ -11,8 +13,6 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@Component
@EnableScheduling
......@@ -20,43 +20,28 @@ public class ScheduledMission {
private final Logger log = LogManager.getLogger(ScheduledMission.class);
private final AsyncTask asyncTask;
private final RedissonUtil redissonUtil;
private final TemplateTitleService templateTitleService;
private final AsyncTask asyncTask;
public ScheduledMission(AsyncTask asyncTask, RedissonUtil redissonUtil,
TemplateTitleService templateTitleService) {
this.asyncTask = asyncTask;
public ScheduledMission(RedissonUtil redissonUtil, AsyncTask asyncTask) {
this.redissonUtil = redissonUtil;
this.templateTitleService = templateTitleService;
this.asyncTask = asyncTask;
}
// @Scheduled(cron = "10/10 * * * * ? ")
// @Async("asyncExecutor")
public void queueDataPull() {
try {
asyncTask.queueDataPull();
} catch (Exception e) {
log.error("定时拉取自动标注队列出错:", e);
}
}
// @Scheduled(cron = "0 0/5 * * * ?")
// @Async("asyncExecutor")
public void templateHourSync() {
try {
if (redissonUtil.tryLock(GenericAttribute.LOCK_TEMPLATE_HOUR, 0, 1, TimeUnit.MINUTES)) {
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.HOUR_OF_DAY, -7);
long startTime = calendar.getTime().getTime();
Calendar calendarEndTime = Calendar.getInstance();
calendarEndTime.add(Calendar.MINUTE, -5);
long endTime = calendarEndTime.getTime().getTime();
templateTitleService.schedulerHourAggregation(asyncTask.findAllGroup(), startTime, endTime);
//释放锁
redissonUtil.unlock(GenericAttribute.LOCK_TEMPLATE_HOUR);
for (String project : asyncTask.findAllGroup()) {
putTask(project, startTime, endTime);
}
} catch (Exception e) {
log.error("十分钟定时同步模板失败:", e);
......@@ -67,18 +52,25 @@ public class ScheduledMission {
// @Async("autMarkExecutor")
public void templateDaySync() {
try {
if (redissonUtil.tryLock(GenericAttribute.LOCK_TEMPLATE_DAY, 0, 1, TimeUnit.MINUTES)) {
Calendar calendar = Calendar.getInstance();
// 聚合1天,文章时间和标注时间都在1天内
calendar.add(Calendar.DAY_OF_MONTH, -1);
long startTime = calendar.getTime().getTime();
templateTitleService.schedulerHourAggregation(asyncTask.findAllGroup(), startTime, System.currentTimeMillis());
//释放锁
redissonUtil.unlock(GenericAttribute.LOCK_TEMPLATE_DAY);
long endTime = System.currentTimeMillis();
for (String project : asyncTask.findAllGroup()) {
putTask(project, startTime, endTime);
}
} catch (Exception e) {
log.error("每天定时同步模板失败:", e);
}
}
private void putTask(String group, long startTime, long endTime) {
AutoTask autoTask = new AutoTask(TaskType.TEMPLATE.getType());
autoTask.getParamSource().put(GenericAttribute.GROUP_PARAM, group);
autoTask.getParamSource().put(GenericAttribute.START_PARAM, startTime);
autoTask.getParamSource().put(GenericAttribute.END_PARAM, endTime);
redissonUtil.putQueue(GenericAttribute.KEY, JSONObject.toJSONString(autoTask));
}
}
package com.zhiwei.middleware.automatic.server.pojo;
import java.io.Serializable;
public class AggreInfo implements Serializable {
private static final long serialVersionUID = 4901060154053874112L;
/**
* 聚合是否完成
*/
Boolean aggreFinshed;
/**
* 共聚合数据条数
*/
int totalCount;
/**
* 疑似噪音数
*/
int noiseCount;
/**
* 模板标题数
*/
int titleFatherCount;
/**
* 自动标注数
*/
int automaticmarkCount;
/**
* 已标注部分是否入库
*/
Boolean inserted;
// JSON解析必须保留
public AggreInfo() {
}
public AggreInfo(Boolean aggreFinshed, Boolean isInserted) {
this.aggreFinshed = aggreFinshed;
this.inserted = isInserted;
}
public Boolean isAggreFinshed() {
return aggreFinshed;
}
public void setAggreFinshed(Boolean aggreFinshed) {
this.aggreFinshed = aggreFinshed;
}
public void setTotalCount(int totalCount) {
this.totalCount = totalCount;
}
public void setNoiseCount(int noiseCount) {
this.noiseCount = noiseCount;
}
public void setTitleFatherCount(int titleFatherCount) {
this.titleFatherCount = titleFatherCount;
}
public void setAutomaticmarkCount(int automaticmarkCount) {
this.automaticmarkCount = automaticmarkCount;
}
public void setAll(boolean aggreFinshed, int totalCount, int noiseCount, int titleFatherCount,
int automaticmarkCount) {
this.aggreFinshed = aggreFinshed;
this.totalCount = totalCount;
this.noiseCount = noiseCount;
this.titleFatherCount = titleFatherCount;
this.automaticmarkCount = automaticmarkCount;
}
public void setAll(boolean aggreFinshed, boolean isInserted, int totalCount, int noiseCount, int titleFatherCount,
int automaticmarkCount) {
this.aggreFinshed = aggreFinshed;
this.inserted = isInserted;
this.totalCount = totalCount;
this.noiseCount = noiseCount;
this.titleFatherCount = titleFatherCount;
this.automaticmarkCount = automaticmarkCount;
}
public Boolean isInserted() {
return inserted;
}
public void setInserted(Boolean inserted) {
this.inserted = inserted;
}
public int getTotalCount() {
return totalCount;
}
public int getNoiseCount() {
return noiseCount;
}
public int getTitleFatherCount() {
return titleFatherCount;
}
public int getAutomaticmarkCount() {
return automaticmarkCount;
}
public String getPrintString() {
StringBuffer sb = new StringBuffer();
sb.append("本次数据采集共计");
sb.append(totalCount);
sb.append("条");
if (0 != noiseCount) {
sb.append("(有效数据");
sb.append(totalCount - noiseCount);
sb.append("条,疑似噪音");
sb.append(noiseCount);
sb.append("条)");
}
sb.append(",聚合模板共计");
sb.append(titleFatherCount);
sb.append("条,自动标注");
sb.append(automaticmarkCount);
sb.append("条");
return sb.toString();
}
}
package com.zhiwei.middleware.automatic.server.pojo;
import com.zhiwei.middleware.automatic.server.pojo.dto.AggreeDTO;
import com.zhiwei.middleware.automatic.server.pojo.CommonAggreeResult.ResultInfo;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CommonAggreeCache {
/** 订单id **/
String id;
/** 更新时间 **/
Long updateTime;
Map<String, AggreeDTO> data;
/** 缓存结果 **/
List<ResultInfo> results;
public CommonAggreeCache(String id) {
this.id = id;
this.updateTime = System.currentTimeMillis();
this.data = new HashMap<>();
this.results = new ArrayList<>();
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Long getUpdateTime() {
return updateTime;
}
public void setUpdateTime(Long updateTime) {
this.updateTime = updateTime;
}
public Map<String, AggreeDTO> getData() {
return data;
}
public void setData(Map<String, AggreeDTO> data) {
this.data = data;
}
public List<ResultInfo> getResults() {
return results;
}
public void setResults(List<ResultInfo> results) {
this.results = results;
}
}
package com.zhiwei.middleware.automatic.server.pojo;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.base.entity.CommonDO;
import com.zhiwei.base.entity.subclass.mark.MarkInfo;
import com.zhiwei.middleware.automatic.server.config.GenericAttribute;
import lombok.Data;
import java.util.Objects;
@Data
public class MarkUploadResult {
/**
* 是否转换成功
*/
private boolean success;
private boolean search;
private String key;
/**
* 信息描述类型
*/
private String infoType;
/**
* 上传信息
*/
private String message;
/**
* 数据信息
*/
private MarkInfo markInfo;
/**
* 原始上传数据
*/
private MarkUploadInfo originData;
/**
* 大库数据
*/
private CommonDO dw;
/**
* 标注数据
*/
private CommonDO mark;
private ClassB.TypeB typeB;
private UploadInfo.DataType dataType;
public void setMarkInfo(MarkInfo markInfo) {
this.success = Objects.nonNull(markInfo);
this.markInfo = markInfo;
}
public MarkUploadResult(MarkUploadInfo markUploadInfo) {
this.originData = markUploadInfo;
this.infoType = GenericAttribute.SUCCESS_SUFFIX;
this.success = false;
}
public void setInfo(String infoType, String message) {
this.infoType = infoType;
this.message = message;
this.success = false;
}
private void clearInfo() {
this.infoType = GenericAttribute.SUCCESS_SUFFIX;
this.message = null;
}
public void setCommonDO(CommonDO dw, CommonDO mark) {
if (Objects.nonNull(dw)) {
this.dw = dw;
this.mark = mark;
this.success = true;
clearInfo();
}
}
}
package com.zhiwei.middleware.automatic.server.pojo;
import lombok.Data;
@Data
public class TemplateNum {
private String title;
private String group;
private Integer number;
public TemplateNum() {}
public TemplateNum(String title, String group) {
this.title = title;
this.group = group;
this.number = 1;
}
}
package com.zhiwei.middleware.automatic.server.pojo;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.middleware.automatic.server.config.GenericAttribute;
import com.zhiwei.middleware.automatic.server.pojo.vo.TemplateFatherVo;
import com.zhiwei.middleware.automatic.server.redis.RedissonUtil;
import lombok.Data;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@Data
public class TitleAggreeResult {
private Map<TemplateFatherVo, List<JSONObject>> templateFatherVoListMap;
private AtomicInteger fatherId;
private String keyword;
private ClassB.TypeB typeB;
private String group;
private String id;
public TitleAggreeResult(String id, String group, ClassB.TypeB typeB, String keyword) {
this.templateFatherVoListMap = new HashMap<>();
this.fatherId = new AtomicInteger(1);
this.keyword = keyword;
this.id = id;
this.group = group;
this.typeB = typeB;
}
}
package com.zhiwei.middleware.automatic.server.pojo;
import lombok.Data;
@Data
public class TitleCosFreq {
private String title;
private double cosFreq;
public TitleCosFreq() {}
public TitleCosFreq(String title, double cosFreq) {
this.title = title;
this.cosFreq = cosFreq;
}
}
package com.zhiwei.middleware.automatic.server.pojo.enums;
public enum AggreeTaskType {
DATA("普通任务", "DATA-COLLECTION:"),
EVENT("事件任务", "event:"),
COMMON("普通任务", "common:");
final String type;
final String keyPrefix;
public String getType() {
return type;
}
public String getKeyPrefix() {
return keyPrefix;
}
AggreeTaskType(String type, String keyPrefix) {
this.type = type;
this.keyPrefix = keyPrefix;
}
}
package com.zhiwei.middleware.automatic.server.pojo.enums;
import com.zhiwei.base.category.ClassB;
public enum Fields {
QA("question_title", "question_content"), VIDEO(), COMPLETE(), INCOMPLETE();
// 默认值
public String title = "title";
public String content = "content";
public String mtag = "mtag";
public String mtime = "mtime";
public String mperson = "mperson";
public String mgroup = "mgroup";
Fields() {
}
Fields(String title, String content) {
this.title = title;
this.content = content;
}
public static Fields getFields(ClassB.TypeB typeB) {
return Fields.valueOf(typeB.name());
}
}
package com.zhiwei.middleware.automatic.server.pojo.vo;
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
@Data
public class TemplateFatherVo implements Serializable {
private static Logger logger = LogManager.getLogger(TemplateFatherVo.class);
private static final long serialVersionUID = 4142532604627291041L;
/**
* 比较字段
*/
private String title = "";
/**
* 检索字段(标题/文本)
*/
private String content = "";
/**
* 模板id
*/
private String fatherId = "1";
/**
* 作为模板的第一条数据
*/
private JSONObject example;
/**
* 子集总数
*/
private Integer totalSon = 0;
private boolean isForward;
/**
* 高亮命中关键词及词频
*/
private List<Map<String, Integer>> hitWordAndRate;
public TemplateFatherVo() {
}
public TemplateFatherVo(String title) {
if (null == title) {
title = "";
}
this.title = title;
this.content = title;
}
public TemplateFatherVo(String title, String content) {
if (null == title) {
title = "";
}
if (null == content) {
content = "";
}
this.title = title;
this.content = content;
}
public void reFreshTotalSon() {
totalSon++;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof TemplateFatherVo)) {
return false;
}
TemplateFatherVo vo = (TemplateFatherVo) o;
if (this.isForward || vo.isForward) {
return false;
}
if (null == content) {
content = "";
}
if (null == title) {
return vo.content.equals(this.content);
}
return vo.content.equals(this.content) && vo.title.equals(this.title);
}
@Override
public int hashCode() {
try {
if (null == content) {
content = "";
}
if (null == title) {
return content.hashCode();
}
return content.hashCode() + title.hashCode();
} catch (Exception e) {
logger.info("TemplateFatherVo获取hashCode错误,title:{},content:{}", title, content, e);
return -1;
}
}
}
package com.zhiwei.middleware.automatic.server.queue;
import com.zhiwei.middleware.automatic.server.config.GenericAttribute;
import com.zhiwei.middleware.automatic.server.pojo.TemplateNum;
import com.zhiwei.middleware.automatic.server.redis.RedissonUtil;
import com.zhiwei.middleware.automatic.server.service.TemplateTitleService;
import com.zhiwei.middleware.automatic.server.util.Tools;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Component
public class TemplateNumQueue implements Runnable {
private static final Logger log = LogManager.getLogger(TemplateNumQueue.class);
private final BlockingQueue<TemplateNum> queue;
private static final int PULL_LIMIT = 1000;
private final RedissonUtil redissonUtil;
private final TemplateTitleService templateTitleService;
public TemplateNumQueue(RedissonUtil redissonUtil, TemplateTitleService templateTitleService,
@Qualifier("asyncExecutor")ThreadPoolTaskExecutor executor) {
this.queue = new LinkedBlockingQueue<>();
this.redissonUtil = redissonUtil;
this.templateTitleService = templateTitleService;
executor.execute(this);
}
public void put(List<TemplateNum> templateNum) {
queue.addAll(templateNum);
}
public void put(TemplateNum templateNum) {
queue.add(templateNum);
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
// 获取锁
if (queue.size() != 0 && redissonUtil.tryLock(GenericAttribute.LOCK_TEMPLATE_NUMBER, 0, 1, TimeUnit.MINUTES)) {
int pullSize = Math.min(queue.size(), PULL_LIMIT);
List<TemplateNum> infoList = new ArrayList<>(pullSize);
infoList.add(queue.take());
for (int i = 0; i < pullSize - 1; i++) {
infoList.add(queue.take());
}
Map<String, List<TemplateNum>> templateNumGroup = infoList.stream().collect(Collectors.groupingBy(TemplateNum::getGroup, Collectors.toList()));
for (Map.Entry<String, List<TemplateNum>> entry : templateNumGroup.entrySet()) {
entry.getValue().stream().collect(Collectors.groupingBy(TemplateNum::getTitle, Collectors.counting())).forEach((title, num) -> {
templateTitleService.modifyTemplateNum(entry.getKey(), title, num);
});
}
redissonUtil.unlock(GenericAttribute.LOCK_TEMPLATE_NUMBER);
}
Tools.sleep(300L);
} catch (Exception e) {
log.error("模板数值更新失败:", e);
}
}
}
}
package com.zhiwei.middleware.automatic.server.service;
import com.zhiwei.base.entity.subclass.mark.MarkInfo;
import com.zhiwei.middleware.automatic.server.pojo.vo.TemplateFatherVo;
import java.util.List;
import java.util.Set;
public interface AutoService {
/**
*/
void asyncAutoMark();
/**
* 异步自动标注 多项目
*/
void autoMarkMulti();
/**
* 噪音聚合自动标注
* @param templateFatherVos 噪音模板
* @param group 项目
* @param field 字段
* @return 标注数量
*/
int noiseAutoMark(Set<TemplateFatherVo> templateFatherVos, String group, String field);
/**
* 事件数据自动标注
* @param group 项目
* @param data 数据集
*/
void autMarkByEvent(String group, List<MarkInfo> data);
}
package com.zhiwei.middleware.automatic.server.service;
import com.zhiwei.middleware.automatic.server.pojo.TemplateRecord;
import com.zhiwei.middleware.automatic.server.pojo.vo.TemplateTitleVo;
import com.zhiwei.middleware.automatic.server.pojo.TemplateTitleVo;
import java.util.List;
import java.util.Map;
......@@ -9,12 +8,6 @@ import java.util.Map;
public interface TemplateTitleService {
/**
* 自动聚合模板
* @param groups 项目集
*/
void schedulerHourAggregation(List<String> groups, Long startTime, Long endTime);
/**
* 获取项目文本模板
* @param project 项目
* @return 模板集
......@@ -22,29 +15,6 @@ public interface TemplateTitleService {
Map<String, TemplateTitleVo> getTemplateTitleByProject(String project);
/**
* 添加项目文本模板
* @param project 项目
* @param vos 模板集
* @return 模板集数量
*/
void setTemplateTitleByProject(String project, Map<String, TemplateTitleVo> vos);
/**
* 修正模板标题的markTag 如果不存在就会增加
*
* @param group 项目组
* @param templateTitle 模板标题
* @param fixTag 正确的标签
*/
boolean modifyTemplateTitle(String group, String templateTitle, String fixTag);
/**
* 修改模板计数
* @param group 项目
*/
void modifyTemplateNum(String group, String title, Long num);
/**
* 根据模板标题获取数据(仅最新100条)
*
* @param group 项目
......@@ -54,12 +24,6 @@ public interface TemplateTitleService {
List<String> getMupdateByTemplateTitle(String group, String templateTitle);
/**
* 新增模板记录
* @param templateRecord 模板记录
*/
void insertTemplateRecord (TemplateRecord templateRecord);
/**
* 根据标题和特征值尝试搜索模板标题
*
* @param group 项目
......@@ -77,13 +41,4 @@ public interface TemplateTitleService {
* @return 返回值
*/
Map<String, Object> compareWithTemplateTileOL(String project, String title);
/**
* 重置自动标注模板
* @param group 项目
* @param templateTitle 模板标题
* @return 是否成功
*/
boolean resetTemplate (String group, String templateTitle);
}
package com.zhiwei.middleware.automatic.server.service;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.middleware.automatic.server.pojo.MarkUploadRule;
import com.zhiwei.middleware.automatic.server.pojo.UploadInfo;
import com.zhiwei.middleware.automatic.server.pojo.enums.InsertType;
import java.util.Map;
public interface UploadService {
/**
* 添加源数据集
*
* @param group 项目
* @param id 任务id
*/
void addUploadList(String group, String id, String sourceStr);
/**
* 启动上传
*/
void startUpload(MarkUploadRule markUploadRule);
/**
* 获取上传状态(进度)
*
* @param group 项目
* @param id 任务id
*
* @return Map<String,Object>
*/
Map<String, Object> getUploadStatus(String group, String id);
/**
* 获取UploadType数据集
*
* @param group 项目
* @param id 任务id
* @param page page
* @param size size
* @param isAsc 排序
* @param searchField 查询字段
* @param keyword 关键字
* @param uploadType 上传类型
* @return
*
* @return Map<String,Object>
*/
Map<String, Object> getUploadInfoList(String group, String id, int page, int size, boolean isAsc,
String searchField, String keyword, UploadInfo.UploadType uploadType);
/**
* 获取DataType
*
* @param json
* @param typeB
*
* @return DataType
*/
UploadInfo.DataType getDataType(JSONObject json, ClassB.TypeB typeB);
/**
* 清理数据集
*
* @param group
* @param id
*
* @return void
*/
void cleanUploadResult(String group, String id);
}
package com.zhiwei.middleware.automatic.server.service.handler;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.middleware.automatic.server.pojo.AggreInfo;
import com.zhiwei.middleware.automatic.server.pojo.enums.AggreeTaskType;
import com.zhiwei.middleware.automatic.server.redis.RedissonUtil;
import com.zhiwei.middleware.automatic.server.util.Tools;
import java.util.List;
public class BaseTaskHandler {
private final RedissonUtil redissonUtil;
private final AggreeTaskType aggreeTaskType;
private static final String SOURCE_KEY = "source";
private static final String TASK_KEY = "task";
public BaseTaskHandler(RedissonUtil redissonUtil, AggreeTaskType aggreeTaskType) {
this.redissonUtil = redissonUtil;
this.aggreeTaskType = aggreeTaskType;
}
public String getKeyPrefix() {
return aggreeTaskType.getKeyPrefix();
}
/**
* 获取聚合任务
* @param group 项目
* @param id id
* @return 聚合任务
*/
public AggreInfo getAggreeTask(String group, String id) {
return JSONObject.parseObject(redissonUtil.getBucket(getTaskKey(group, id))).toJavaObject(AggreInfo.class);
}
/**
* 添加聚合任务
* @param group 项目
* @param id id
* @param aggreInfo 聚合任务
*/
public void addAggreeTask(String group, String id, AggreInfo aggreInfo) {
redissonUtil.setBucket(getTaskKey(group, id), JSONObject.toJSONString(aggreInfo));
}
/**
* 添加数据集
* @param group 项目
* @param id id
* @param dataSource 数据集
*/
public void addDataSource(String group, String id, List<String> dataSource) {
redissonUtil.setList(getSourceKey(group, id), dataSource);
}
/**
* 获取数据集
* @param group 项目
* @param id id
* @return 数据集
*/
public List<String> getDataSource(String group, String id) {
return redissonUtil.getList(getSourceKey(group, id));
}
public long getDataSourceSize(String group, String id) {
return redissonUtil.getListSize(getSourceKey(group, id));
}
public void dataSourceExpirable(String group, String id) {
redissonUtil.listExpirable(getSourceKey(group, id), 30);
}
/**
* 删除该任务得所有信息
* @param group 项目
* @param id id
*/
public void removerInfo(String group, String id) {
// 数据集
redissonUtil.deleteList(getSourceKey(group, id));
redissonUtil.deleteBucket(getTaskKey(group, id));
}
private String getSourceKey(String group, String id) {
return Tools.assembleKey(aggreeTaskType.getKeyPrefix(), SOURCE_KEY, group, id);
}
private String getTaskKey(String group, String id) {
return Tools.assembleKey(aggreeTaskType.getKeyPrefix(), TASK_KEY, group, id);
}
}
package com.zhiwei.middleware.automatic.server.service.handler;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.entity.subclass.mark.MarkInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
/**
*
* @ClassName: KafkaSendHandler
* @Description: kafka发送消息
* @author shenjunjie
* @date 2019年8月29日 下午11:09:30
*/
@Component
@EnableKafka
public class KafkaSendHandler {
private static final Logger logger = LogManager.getLogger(KafkaSendHandler.class);
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Value("${crawler.topic}")
private String topic;
public void insertDataByMarkInfo(List<MarkInfo> list) {
list.forEach(markInfo -> {
insertData(markInfo.getSourceObj());
});
logger.info("Kafka发送消息{}条", list.size());
}
public void insertData(JSONObject json) {
String cname = json.getString("cname");
try {
while (!syncSendKafkaMsg(topic, cname, json)) {
logger.error("Kafka消息发送{}失败,立即重试...", cname);
}
} catch (Exception e) {
logger.error("Kafka发送消息异常,等待3s后重试...");
}
}
/**
* 同步发送kafka消息
*
* @param topic
* kafka 主题
* @param msgType
* 消息类型
* @param list
* 待发送数据
* @return
*/
private boolean syncSendKafkaMsg(String topic, String msgType, JSONObject json) {
Boolean[] isSuccesses = new Boolean[] { true };
// 2019/7/11 11:13 value由list调整为string
kafkaTemplate.send(topic, msgType, json.toJSONString()).addCallback((success) -> {
isSuccesses[0] = false;
}, (failure) -> {
logger.error("KafkaSendFailure", failure);
});
return isSuccesses[0];
}
}
package com.zhiwei.middleware.automatic.server.service.handler;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.middleware.automatic.server.config.GenericAttribute;
import com.zhiwei.middleware.automatic.server.pojo.CommonAggreeCache;
import com.zhiwei.middleware.automatic.server.pojo.dto.AggreeDTO;
import com.zhiwei.middleware.automatic.server.pojo.enums.AggreeTaskType;
import com.zhiwei.middleware.automatic.server.redis.RedissonUtil;
import com.zhiwei.middleware.automatic.server.util.Tools;
import com.zhiwei.nlp.AggreeBootStarter;
import com.zhiwei.nlp.utils.BasicUtil;
import com.zhiwei.nlp.vo.KResult;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import com.zhiwei.middleware.automatic.server.pojo.CommonAggreeResult.ResultInfo;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Service
public class TextHandlerService extends BaseTaskHandler {
private static final Logger log = LogManager.getLogger(TextHandlerService.class);
/** 默认规格标准0.1 **/
private static final double DEFAULT_LIMIT = 0.1;
/** 单个聚合任务不得超出15W条 **/
private static final int AGGREE_COUNT_LIMIT = 150000;
private static final String TEXT_RES = "textRes";
/* 启动聚合处理线程池 */
private final ThreadPoolTaskExecutor START_SERVICE;
private final RedissonUtil redissonUtil;
public TextHandlerService(RedissonUtil redissonUtil,
@Qualifier("aggreeExecutor") ThreadPoolTaskExecutor aggreeExecutor) {
super(redissonUtil, AggreeTaskType.COMMON);
this.redissonUtil = redissonUtil;
this.START_SERVICE = aggreeExecutor;
}
/**
* 获取订单id
*
* @return String
*/
public String generateAggreeOrder() {
// 生成聚合任务订单
return redissonUtil.nextId(GenericAttribute.KEY_INCREMENT);
}
public boolean appendAggreeOrderNew(String id, List<AggreeDTO> list) {
long listSize = getDataSourceSize(null, id);
if (!checkLimit((int) listSize)) {
log.info("id:{},聚合任务超出上限:{},预期值:{}", id, AGGREE_COUNT_LIMIT, list.size() + listSize);
return false;
}
addDataSource(null, id, list.stream().map(JSONObject::toJSONString).collect(Collectors.toList()));
log.info("id:{},聚合任务添加{}条", id, list.size());
return true;
}
public boolean startAggree(String id) {
return startAggree(id, DEFAULT_LIMIT);
}
/**
* 开启聚合
*
* @param id 任务id
* @param limit limit
*
* @return boolean
*/
public boolean startAggree(String id, double limit) {
List<ResultInfo> res = new ArrayList<>();
List<String> source = getDataSource(null, id);
if (null == source) {
return false;
}
if (START_SERVICE.getActiveCount() == 10) {
return false;
}
START_SERVICE.execute(() -> {
log.info("id:{},开始聚合任务", id);
Map<String, AggreeDTO> dataGroup = source.stream()
.map(e -> JSONObject.parseObject(e).toJavaObject(AggreeDTO.class))
.collect(Collectors.toMap(AggreeDTO::getId, dto -> dto));
// 添加统一简体处理
List<KResult<String>> kResultList = AggreeBootStarter.getKResult(
dataGroup.values().stream().collect(Collectors.toMap(AggreeDTO::getId, AggreeDTO::getText)), limit);
// 添加聚合结果
kResultList.forEach(result -> res.add(packageResultInfo(result, dataGroup)));
// 按照聚合量级倒叙排序
res.sort((a, b) -> (b.getSize() - a.getSize()));
// 结果缓存
String resKey = Tools.assembleKey(TEXT_RES, id);
redissonUtil.setList(resKey, res.stream().map(JSONObject::toJSONString).collect(Collectors.toList()));
dataSourceExpirable(null, id);
redissonUtil.listExpirable(resKey, 30);
log.info("id:{},聚合任务结束,缓存已生成", id);
});
return true;
}
/**
* 获取聚合结果new
*
* @param id 任务id
*
* @return Map<String,List<AggreeInfo>>
*/
public CommonAggreeCache getAggreeResult(String id) {
CommonAggreeCache cache = new CommonAggreeCache(id);
cache.setResults(redissonUtil.getList(Tools.assembleKey(TEXT_RES, id))
.stream().map(e -> JSONObject.parseObject(e).toJavaObject(ResultInfo.class)).collect(Collectors.toList()));
cache.setData(getDataSource(null, id)
.stream()
.map(e -> JSONObject.parseObject(e).toJavaObject(AggreeDTO.class))
.collect(Collectors.toMap(AggreeDTO::getId, dto -> dto)));
return cache;
}
private ResultInfo packageResultInfo(KResult<String> result, Map<String, AggreeDTO> sourceMap) {
List<String> indexes = result.getDataPoints();
AggreeDTO templateData;
List<String> hitList = indexes.stream().filter(index -> BasicUtil
.textComparisonByAggreeConfig(result.getClusterName(), sourceMap.get(index).getText()))
.collect(Collectors.toList());
if (hitList.isEmpty()) {
// 未命中重要渠道选择第一条数据
log.info("未命中重要渠道选择第一条数据");
templateData = sourceMap.get(indexes.get(0));
} else {
templateData = sourceMap.get(hitList.get(0));
}
return new ResultInfo(result.getClusterName(), indexes.size(),
sourceMap.values().stream().filter(dto -> indexes.contains(dto.getId())).collect(Collectors.toList()),
templateData);
}
/**
* 校验单个任务是否超过上限
*
* @param size size
* @return boolean
*/
private boolean checkLimit(int size) {
return AGGREE_COUNT_LIMIT > size;
}
}
package com.zhiwei.middleware.automatic.server.util;
import com.zhiwei.middleware.automatic.server.config.GlobalPojo;
import com.alibaba.fastjson.JSONObject;
import java.io.UnsupportedEncodingException;
import java.io.*;
import java.util.*;
public class CosineSimilarity {
private static final List<String> BRAND_WORDS = new ArrayList<>();
private static final String path = "classpath:static/brandWords.json";
static {
try {
InputStream inputStream = new FileInputStream(path);
String jsonStr = readJsonFile(inputStream);
if (null != jsonStr) {
List<List> array = JSONObject.parseArray(jsonStr, List.class);
for (List str : array) {
BRAND_WORDS.addAll(str);
}
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
public static double calculateTextSimWithBrand(String doc1, String doc2) {
if (handleByLength(doc1, doc2) && compareWithBrand(doc1, doc2)) {
return calculateSimilar(doc1, doc2);
......@@ -121,12 +140,12 @@ public class CosineSimilarity {
* @return 关键字数量是否一致
*/
private static boolean compareWithBrand(String doc1, String doc2) {
if (null == GlobalPojo.BRAND_WORDS || GlobalPojo.BRAND_WORDS.isEmpty()) {
if (null == BRAND_WORDS || BRAND_WORDS.isEmpty()) {
return true;
}
Set<String> set1 = new HashSet<>();
Set<String> set2 = new HashSet<>();
for (String brand : GlobalPojo.BRAND_WORDS) {
for (String brand : BRAND_WORDS) {
if (doc1.contains(brand)) {
set1.add(brand);
}
......@@ -149,4 +168,25 @@ public class CosineSimilarity {
public static boolean isHanZi(char ch) {
return (ch >= 0x4E00 && ch <= 0x9FA5);
}
/**
* 读取json文件,返回json串
*
* @return
*/
public static String readJsonFile(InputStream fileInputStream) {
try {
Reader reader = new InputStreamReader(fileInputStream, "utf-8");
int ch = 0;
StringBuffer sb = new StringBuffer();
while ((ch = reader.read()) != -1) {
sb.append((char) ch);
}
reader.close();
return sb.toString();
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
package com.zhiwei.middleware.automatic.server.util;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.base.category.ClassB;
import com.zhiwei.base.entity.CommonDO;
import com.zhiwei.base.entity.subclass.mark.CompleteTextMark;
import com.zhiwei.base.entity.subclass.mark.IncompleteTextMark;
import com.zhiwei.base.entity.subclass.mark.QATextMark;
import com.zhiwei.base.entity.subclass.mark.VideoMark;
import com.zhiwei.base.filter.FilterInfo;
import com.zhiwei.middleware.automatic.server.config.GenericAttribute;
import com.zhiwei.middleware.automatic.server.pojo.enums.Fields;
import com.zhiwei.middleware.automatic.server.pojo.vo.TemplateFatherVo;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static com.zhiwei.middleware.automatic.server.config.GenericAttribute.SON_ID;
public class DataCollectionUtil {
/**
* 计算词频
*
* @param word
* @param text
* @return
*/
public static int calculateRate(String word, String text) {
if (StringUtils.isEmpty(word)) {
return 0;
}
int rate = 0;
int from = 0;
while (true) {
if ((from = text.indexOf(word, from) + 1) > 0) {
rate++;
} else {
break;
}
}
return rate;
}
public static List<FilterInfo> changeJSONList2FilterInfoList(List<JSONObject> list, String group, ClassB.TypeB typeB) {
Class<? extends CommonDO> clazz;
switch (typeB) {
case INCOMPLETE:
clazz = IncompleteTextMark.class;
break;
case COMPLETE:
clazz = CompleteTextMark.class;
break;
case QA:
clazz = QATextMark.class;
break;
case VIDEO:
clazz = VideoMark.class;
break;
default:
throw new IllegalArgumentException("未能解析到的typeB类型:" + typeB);
}
return list.stream().map(json -> {
json.put("mgroup", group);
return JSONObject.parseObject(json.toJSONString(), clazz).filterInfo();
}).collect(Collectors.toList());
}
public static void supplementForInsert(List<JSONObject> list, String group, String mtag, String mperson) {
for (JSONObject obj : list) {
obj.remove(SON_ID);
obj.put("mgroup", group);
obj.put("mtag", mtag);
if (StringUtils.isEmpty(mperson)) {
obj.put("mperson", GenericAttribute.AUTO_PERSON);
} else {
obj.put("mperson", mperson);
}
// 固定字段
obj.put("cid", GenericAttribute.AUTO_CID);
obj.put("cname", GenericAttribute.AUTO_CNAME);
}
}
/**
*
* 判断事件采集该部分是否有标签(以首个为例)
*
* @param fields
* @return boolean
*/
public static boolean hasTag(TemplateFatherVo fatherVo, Fields fields) {
if (null == fatherVo) {
return false;
}
if (null != fatherVo.getExample().getString(fields.mtag)) {
return true;
}
return false;
}
public static List<TemplateFatherVo> fuzzyMatch(List<TemplateFatherVo> list, String character, boolean isTitle,
int markFlag) {
if (null == list) {
return Collections.emptyList();
}
Boolean isMarked;
switch (markFlag) {
case (0):
isMarked = null;
break;
case (1):
isMarked = true;
break;
default:
isMarked = false;
}
List<List<String>> fuzzyList = null;
List<TemplateFatherVo> res = new ArrayList<>();
if (!StringUtils.isEmpty(character)) {
// 拆分匹配字符
fuzzyList = cutKeyword(character);
}
// 遍历目标集
for (TemplateFatherVo fatherVo : list) {
// 现有聚合集的标注结果
boolean marked = !StringUtils.isEmpty(fatherVo.getExample().getString(Fields.COMPLETE.mtag));
String title;
if (isTitle) {
title = fatherVo.getTitle();
} else {
title = fatherVo.getTitle() + fatherVo.getContent();
}
if ((null == isMarked || isMarked == marked) && isHit(fuzzyList, title)) {
res.add(fatherVo);
}
}
return res;
}
public static List<List<String>> cutKeyword(String keyword) {
List<List<String>> fuzzyList = new ArrayList<>();
if (StringUtils.isBlank(keyword)) {
return fuzzyList;
}
// 先根据"|"区分或关系
String[] anyStrs = keyword.split("\\|");
for (String any : anyStrs) {
List<String> tempList = new ArrayList<>();
// 再根据" "区分且关系
String[] andStrs = any.split(" ");
for (String and : andStrs) {
tempList.add(and);
}
fuzzyList.add(tempList);
}
return fuzzyList;
}
public static boolean isHit(List<List<String>> fuzzyList, String title) {
if (null == fuzzyList) {
return true;
}
if (null == title) {
return false;
}
for (List<String> ands : fuzzyList) {
boolean res = true;
for (String and : ands) {
// 每个"且"条件都必须符合
if (!title.contains(and)) {
res = false;
break;
}
}
if (res) {
return true;
}
}
return false;
}
public static List<TemplateFatherVo> getList(List<TemplateFatherVo> list, int page, int size) {
list.sort((x, y) -> Double.compare(Double.valueOf(y.getFatherId()), Double.valueOf(x.getFatherId())));
return Tools.listPagedQuery(list, page, size);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment