Commit d0d002aa by shenjunjie

Merge branch 'feature' into 'release'

舆情分析-舆情驾驶舱

See merge request !455
parents a8cc0dca a03528a4
......@@ -47,6 +47,7 @@ public class GenericAttribute {
* es gid
**/
public static final String ES_GID = "gid";
public static final String ES_MGID = "mgid";
public static final String ES_PROJECT_ID = "project_id";
public static final String ES_CONTEND_ID = "contend_id";
public static final String ES_CHANNEL_FID = "fid";
......@@ -88,6 +89,7 @@ public class GenericAttribute {
* es mtime
**/
public static final String ES_MTIME = "mtime";
public static final String ES_STIME = "stime";
/**
* es mtag
**/
......
......@@ -102,6 +102,17 @@ public class RedisKeyPrefix {
public static final String NON_MANUAL_PROJECT_MARK_MAX_GID = "BRANDKBS:NON_MANUAL:PROJECT:MARK:MAX_GID:";
/**
* 新舆情分析页面相关缓存
*/
public static final String YUQING_ANALYZE_PROJECT_AVG_COUNT = "BRANDKBS:YUQING:ANALYZE:AMOUNT:AVG:";
public static final String YUQING_ANALYZE_EMOTION_DISTRIBUTION_AVG = "BRANDKBS:YUQING:ANALYZE:EMOTION:AVG:";
public static final String YUQING_ANALYZE_PLATFORM_AVG_COUNT = "BRANDKBS:YUQING:ANALYZE:PLATFORM:AVG:";
public static final String YUQING_ANALYZE_HIGH_WORD = "BRANDKBS:YUQING:ANALYZE:HIGH:WORD:";
public static String projectWarnHotTopKeyAll(String projectId, String type) {
return RedisKeyPrefix.generateRedisKey(RedisKeyPrefix.PROJECT_WARN_HOT_TOP, projectId, Tools.concat(type, "*"));
}
......
......@@ -54,6 +54,16 @@ public class CommonController extends BaseController {
}
}
@ApiOperation("获取平台类型及id")
@GetMapping("/get/platform-id")
public ResponseResult getPlatformWithId() {
try {
return ResponseResult.success(commonService.getQbjcPlatform("id", "name"));
} catch (Exception e) {
return ResponseResult.failure(e.getMessage());
}
}
@ApiOperation("获取当前用户拥有的所有项目及品牌列表")
@GetMapping("/user/getUserAllProjects")
@Auth(role = RoleEnum.CUSTOMER)
......
......@@ -440,6 +440,91 @@ public class AppArticleController extends BaseController {
return ResponseResult.success(markDataService.getNonManualMarkAggreeList(markSearchDTO));
}
@ApiOperation("获取方案列表")
@GetMapping("/analyze/plan/list")
public ResponseResult getNonManualPlanList(){
return ResponseResult.success(markDataService.getNonManualPlanList());
}
@ApiOperation("新-舆情分析-舆情总量")
@GetMapping("/analyze/amount")
public ResponseResult getYuqingAmount(@RequestParam(value = "startTime") Long startTime,
@RequestParam(value = "endTime") Long endTime,
@RequestParam(value = "planId", required = false) String planId) {
return ResponseResult.success(markDataService.getYuqingAmount(startTime, endTime, planId));
}
@ApiOperation("新-舆情分析-情感分布")
@GetMapping("/analyze/emotion")
public ResponseResult getYuqingEmotionDistribution(@RequestParam(value = "startTime") Long startTime,
@RequestParam(value = "endTime") Long endTime,
@RequestParam(value = "planId", required = false) String planId) {
return ResponseResult.success(markDataService.getYuqingEmotionDistribution(startTime, endTime, planId));
}
@ApiOperation("新-舆情分析-重点平台")
@GetMapping("/analyze/important-platform")
public ResponseResult getImportantPlatformPercentage(@RequestParam(value = "startTime") Long startTime,
@RequestParam(value = "endTime") Long endTime,
@RequestParam(value = "planId", required = false) String planId) {
return ResponseResult.success(markDataService.getImportantPlatformPercentage(startTime, endTime, planId));
}
@ApiOperation("新-舆情分析-平台占比")
@GetMapping("/analyze/platform-percent")
public ResponseResult getPlatformPercentage(@RequestParam(value = "startTime") Long startTime,
@RequestParam(value = "endTime") Long endTime,
@RequestParam(value = "planId", required = false) String planId) {
return ResponseResult.success(markDataService.getPlatformPercentage(startTime, endTime, planId));
}
@ApiOperation("新-舆情分析-舆情走势图")
@GetMapping("/analyze/tendency")
public ResponseResult getSpreadTendency(@RequestParam(value = "startTime") Long startTime,
@RequestParam(value = "endTime") Long endTime,
@RequestParam(value = "planId", required = false) String planId) {
return ResponseResult.success(markDataService.getSpreadTendency(startTime, endTime, planId));
}
@ApiOperation("新-舆情分析-活跃渠道")
@GetMapping("/analyze/active-channel")
public ResponseResult getActiveChannels(@RequestParam(value = "startTime") Long startTime,
@RequestParam(value = "endTime") Long endTime,
@RequestParam(value = "planId", required = false) String planId) {
return ResponseResult.success(markDataService.getActiveChannels(startTime, endTime, planId));
}
@ApiOperation("新-舆情分析-ip分布")
@GetMapping("/analyze/ip-located")
public ResponseResult getArticleIpLocated(@RequestParam(value = "startTime") Long startTime,
@RequestParam(value = "endTime") Long endTime,
@RequestParam(value = "planId", required = false) String planId,
@RequestParam(value = "size") int size) {
return ResponseResult.success(markDataService.getArticleIpLocated(startTime, endTime, planId, size));
}
@ApiOperation("新-舆情分析-词云")
@GetMapping("/analyze/high-word")
public ResponseResult getHighWord(@RequestParam(value = "startTime") Long startTime,
@RequestParam(value = "endTime") Long endTime,
@RequestParam(value = "planId", required = false) String planId) {
return ResponseResult.success(markDataService.getHighWord(startTime, endTime, planId, true));
}
@ApiOperation("新-舆情分析-高频标题")
@GetMapping("/analyze/frequent-title")
public ResponseResult getLastNews(@RequestParam(value = "startTime") Long startTime,
@RequestParam(value = "endTime") Long endTime,
@RequestParam(value = "planId", required = false) String planId) {
return ResponseResult.success(markDataService.getLastNews(startTime, endTime, planId, 5, true));
}
@ApiOperation("新-舆情分析-活跃渠道、ip分布、词云详情页面,社媒平台发文")
@PostMapping("/analyze/mark-data")
public ResponseResult getYuqingAnalyzeDetail(@RequestBody MarkSearchDTO markSearchDTO) {
return ResponseResult.success(markDataService.getYuqingAnalyzeDetail(markSearchDTO));
}
private boolean checkMTagIllegal(StringBuilder mtag) {
List<MarkerTag> hitTags = projectService.getProjectById(UserThreadLocal.getProjectId()).getHitTags();
if (!Tools.isEmpty(hitTags)) {
......
......@@ -311,6 +311,18 @@ public class EsQueryTools {
}
/**
* ip查询
*
* @param ip ip地址
* @return
*/
public static BoolQueryBuilder assembleIpQuery(String ip) {
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
queryBuilder.should(QueryBuilders.termQuery("ip_location.keyword", ip));
return queryBuilder;
}
/**
* 字段不拆封,多字段 同关键词
*
* @param @param boolQueryBuilder
......
......@@ -181,7 +181,18 @@ public class MarkSearchDTO {
@ApiModelProperty(value = "数据类型(1:长文本, 2:短文本, 3:问答, 5:视频)")
private Integer dataType;
/**
* gid用于未读已读筛选,仅非人工项目
* ip地址
*/
@ApiModelProperty(value = "ip地址")
private String ip;
/**
* gid用于未读已读筛选,非人工项目舆情数据用
*/
@ApiModelProperty(value = "gid")
private Long gid;
/**
* gid限制 舆情分析页面滚动翻页用
*/
@ApiModelProperty(value = "gid限制")
private Long pageGid;
}
......@@ -163,7 +163,7 @@ public class ProjectVO {
project.setPositiveChannelParams((this.getPositiveChannelParams()));
project.setBlackChannelGroup(this.getBlackChannelGroup());
project.setShow(true);
project.setStart(false);
project.setStart(true);
project.setManual(true);
project.setCTime(time.getTime());
project.setUTime(time.getTime());
......@@ -189,7 +189,7 @@ public class ProjectVO {
// 非人工项目
project.setManual(false);
project.setShow(true);
project.setStart(false);
project.setStart(true);
project.setCTime(System.currentTimeMillis());
project.setUTime(System.currentTimeMillis());
return project;
......
......@@ -436,6 +436,12 @@ public interface MarkDataService {
PageVO<JSONObject> getNonManualProjectPlanList();
/**
* 获取方案列表
* @return
*/
List<JSONObject> getNonManualPlanList();
/**
* 获取方案设置-方案列表-昨日,今日数据消耗量
* @return
*/
......@@ -511,4 +517,121 @@ public interface MarkDataService {
* @return
*/
List<JSONObject> getNonManualMarkCountList();
/**
* 计算近一年舆情总量项目日均
* @param projectId
* @param planId
* @return
* @throws IOException
*/
void countYuqingAmountAvg(Long startTime, Long endTime, String projectId, String planId) throws IOException;
/**
* 计算近一年情感分布均值
* @param projectId
* @param planId
* @throws IOException
*/
void countEmotionDistributionAvg(Long startTime, Long endTime, String projectId, String planId) throws IOException;
/**
* 计算项目近一年重点平台均值
* @param projectId
* @param planId
* @throws IOException
*/
void countImportantPlatformPercentageAvg(Long startTime, Long endTime, String projectId, String planId) throws IOException;
/**
* 新-舆情分析-舆情总量
* @param startTime
* @param endTime
* @param planId
* @return
* @throws IOException
*/
JSONObject getYuqingAmount(Long startTime, Long endTime, String planId);
/**
* 新-舆情分析-情感分布
* @param startTime
* @param endTime
* @param planId
* @return
* @throws IOException
*/
JSONObject getYuqingEmotionDistribution(Long startTime, Long endTime, String planId);
/**
* 新-舆情分析-重点平台
* @param startTime
* @param endTime
* @param planId
* @return
*/
JSONObject getImportantPlatformPercentage(Long startTime, Long endTime, String planId);
/**
* 新-舆情分析-平台占比
* @param startTime
* @param endTime
* @param planId
* @return
*/
List<JSONObject> getPlatformPercentage(Long startTime, Long endTime, String planId);
/**
* 新-舆情分析-舆情走势图
* @param startTime
* @param endTime
* @param planId
* @return
*/
JSONObject getSpreadTendency(Long startTime, Long endTime, String planId);
/**
* 新-舆情分析-活跃渠道
* @param startTime
* @param endTime
* @param planId
* @return
*/
List<JSONObject> getActiveChannels(Long startTime, Long endTime, String planId);
/**
* 新-舆情分析-ip分布
* @param startTime
* @param endTime
* @param planId
* @return
*/
List<JSONObject> getArticleIpLocated(Long startTime, Long endTime, String planId, int size);
/**
* 新-舆情分析-活跃渠道、ip分布、词云详情页面
* @param dto
* @return
*/
PageVO<MarkFlowEntity> getYuqingAnalyzeDetail(MarkSearchDTO dto);
/**
* 新-舆情分析-词云
* @param startTime
* @param endTime
* @param planId
* @return
*/
List<JSONObject> getHighWord(Long startTime, Long endTime, String planId, boolean cache);
/**
* 新-舆情分析-高频标题
* @param startTime
* @param endTime
* @param planId
* @param size
* @param include
* @return
*/
List<JSONObject> getLastNews(Long startTime, Long endTime, String planId, int size, boolean include);
}
......@@ -42,4 +42,14 @@ public interface TaskService{
* 事件相关更新
*/
void eventUpdate();
/**
* 计算项目舆情总量、情感分布、重点平台均值
*/
void calculateProjectAvg();
/**
* 生成舆情分析词云缓存
*/
void yuqingAnalyzeHighWordCache();
}
......@@ -131,6 +131,14 @@ public class EsSearchServiceImpl implements EsSearchService {
if (StringUtils.isNotEmpty(dto.getHostKeyword())) {
postFilter.must(EsQueryTools.assembleFiledKeywordQuery("host", dto.getHostKeyword()));
}
// ip
if (StringUtils.isNotEmpty(dto.getIp())){
postFilter.must(EsQueryTools.assembleIpQuery(dto.getIp()));
}
// mgid限制 舆情分析页面滚动翻页用
if (Objects.nonNull(dto.getPageGid())){
postFilter.must(QueryBuilders.rangeQuery(GenericAttribute.ES_MGID).lt(dto.getPageGid()));
}
helper.setPostFilter(postFilter);
// helper.setQuery(query);
// sort
......@@ -315,6 +323,14 @@ public class EsSearchServiceImpl implements EsSearchService {
if (Objects.nonNull(dto.getDataType())){
postFilter.must(EsQueryTools.assembleC2Query(dto.getDataType()));
}
// ip
if (StringUtils.isNotEmpty(dto.getIp())){
postFilter.must(EsQueryTools.assembleIpQuery(dto.getIp()));
}
// mgid限制 舆情分析页面滚动翻页用
if (Objects.nonNull(dto.getPageGid())){
postFilter.must(QueryBuilders.rangeQuery(GenericAttribute.ES_MGID).lt(dto.getPageGid()));
}
helper.setPostFilter(postFilter);
// sort
FieldSortBuilder sort = null;
......
......@@ -55,6 +55,7 @@ import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
......@@ -74,6 +75,7 @@ import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
......@@ -81,9 +83,12 @@ import javax.annotation.Resource;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* @ClassName: MarkDataServiceImpl
......@@ -178,6 +183,9 @@ public class MarkDataServiceImpl implements MarkDataService {
@Value("${brandkbs.file.url}")
private String brandkbsFilePath;
@Resource(name = "esSearchExecutor")
ThreadPoolTaskExecutor executor;
@Override
public PageVO<MarkFlowEntity> getOriginList(MarkSearchDTO markSearchDTO) {
try {
......@@ -845,6 +853,52 @@ public class MarkDataServiceImpl implements MarkDataService {
return result.entrySet().stream().sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())).limit(size).collect(Collectors.toList());
}
private List<JSONObject> getMarkTopTitle(Long startTime, Long endTime, String emotion, String projectId, String contendId, String planId, int size) throws IOException {
// 索引
String[] indexes = esClientDao.getIndexes();
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("titles").field("agg_title.keyword").size(size + 1);
TermsAggregationBuilder sourceAggregationBuilder = AggregationBuilders.terms("source").field("source").size(10000);
// query
BoolQueryBuilder query;
if (Objects.isNull(planId)) {
query = projectContendIdQuery(projectId, contendId);
}else {
query = EsQueryTools.assembleCacheMapsPlanQuery(projectId, planId);
}
query.must(QueryBuilders.rangeQuery("time").gte(startTime).lt(endTime))
// 过滤微博
.mustNot(QueryBuilders.termQuery("platform_id", "5d02236e6395002a7c380b79"));
if (null != emotion && !Objects.equals(emotion, EmotionEnum.ALL.getName())) {
query.must(QueryBuilders.termQuery("brandkbs_mark_cache_maps.name.keyword", emotion));
}
SearchResponse searchResponse = esClientDao.searchResponse(indexes, null, query,
aggregationBuilder.subAggregation(sourceAggregationBuilder), null, null, 0, 0, null);
List<JSONObject> res = new ArrayList<>();
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
ParsedStringTerms teamAgg = (ParsedStringTerms) aggMap.get("titles");
for (Terms.Bucket bucket : teamAgg.getBuckets()) {
JSONObject jsonObject = new JSONObject();
String title = bucket.getKeyAsString();
// 过滤 “分享一篇文章” 的标题
if ("分享一篇文章".equals(title)) {
continue;
}
// result.merge(title, num, Integer::sum);
jsonObject.put("title", title);
jsonObject.put("num", bucket.getDocCount());
Map<String, Aggregation> aggregationMap = bucket.getAggregations().asMap();
ParsedStringTerms sourceAgg = (ParsedStringTerms) aggregationMap.get("source");
jsonObject.put("sourceCount", sourceAgg.getBuckets().size());
List<String> sources = new ArrayList<>();
for (Terms.Bucket sourceBucket : sourceAgg.getBuckets()) {
sources.add(sourceBucket.getKeyAsString());
}
jsonObject.put("sources", sources);
res.add(jsonObject);
}
return res;
}
@Override
public BaseMap getFirstArticle(Long startTime, Long endTime, String aggTitle, String projectId, String contendId) throws IOException {
return getFirstArticle(startTime, endTime, aggTitle, projectId, contendId, true);
......@@ -852,16 +906,28 @@ public class MarkDataServiceImpl implements MarkDataService {
@Override
public BaseMap getFirstArticle(Long startTime, Long endTime, String aggTitle, String projectId, String contendId, boolean include) throws IOException {
return getFirstArticle(startTime, endTime, aggTitle, projectId, contendId, null, include);
}
private BaseMap getFirstArticle(Long startTime, Long endTime, String aggTitle, String projectId, String contendId, String planId, boolean include) throws IOException{
if (StringUtils.isBlank(aggTitle)){
return null;
}
// 索引
String[] indexes = esClientDao.getIndexes();
// postFilter
BoolQueryBuilder postFilter;
if (Objects.isNull(planId)) {
if (include) {
postFilter = projectContendIdQuery(projectId, contendId);
} else {
postFilter = EsQueryTools.assembleCacheMapsQueryExcludePrimaryId(projectId);
}
postFilter.must(QueryBuilders.rangeQuery("time").gte(startTime).lt(endTime)).must(QueryBuilders.termQuery("agg_title.keyword", aggTitle));
}else {
postFilter = EsQueryTools.assembleCacheMapsPlanQuery(projectId, planId);
}
postFilter.must(QueryBuilders.rangeQuery("time").gte(startTime).lt(endTime));
postFilter.must(QueryBuilders.termQuery("agg_title.keyword", aggTitle));
//sort
FieldSortBuilder sort = new FieldSortBuilder("time").order(SortOrder.ASC);
//hits
......@@ -872,7 +938,6 @@ public class MarkDataServiceImpl implements MarkDataService {
return Tools.getBaseFromEsMap(hits.getAt(0).getSourceAsMap());
}
@Override
public List<JSONObject> searchMarkDataByEvent(Event event) {
Long endTime = null;
......@@ -1874,6 +1939,18 @@ public class MarkDataServiceImpl implements MarkDataService {
return PageVO.createPageVo(total, 1, 5, collect);
}
@Override
public List<JSONObject> getNonManualPlanList() {
Query query = new Query(Criteria.where("projectId").is(UserThreadLocal.getProjectId()));
List<NonManualProjectPlan> planList = nonManualProjectPlanDao.findList(query);
return planList.stream().map(plan -> {
JSONObject jsonObject = new JSONObject();
jsonObject.put("id", plan.getId());
jsonObject.put("name", plan.getName());
return jsonObject;
}).collect(Collectors.toList());
}
/**
* 获取关键词/去噪词数量
* @param word
......@@ -2183,9 +2260,9 @@ public class MarkDataServiceImpl implements MarkDataService {
aggreeList.setInfo(jsonObject);
return aggreeList;
}catch (Exception e){
log.error("获取非人工项目舆情列表聚合结果出错", e);
return new PageVO<>();
ExceptionCast.cast(CommonCodeEnum.FAIL, "getNonManualMarkAggreeList异常-", e);
}
return new PageVO<>();
}
@Override
......@@ -2207,9 +2284,573 @@ public class MarkDataServiceImpl implements MarkDataService {
}
return res;
}catch (Exception e){
log.error("获取非人工项目舆情列表数据量统计出错", e);
ExceptionCast.cast(CommonCodeEnum.FAIL, "getNonManualMarkAggreeList异常-", e);
}
return Collections.emptyList();
}
@Override
public void countYuqingAmountAvg(Long startTime, Long endTime, String projectId, String planId) throws IOException {
Long total = getYuqingAnalyzeCount(startTime, endTime, projectId, planId, null, null);
int days = new Period(startTime, endTime, PeriodType.days()).getDays();
days = 0 == days ? 1 : days;
// avg
double avg = Objects.isNull(total) || 0 == total ? 0d : total / (double) days;
String projectYuqingCountAvgKey = RedisUtil.getYuqingAnalyzeProjectAvgCountKey(projectId, Constant.PRIMARY_CONTEND_ID, planId);
redisUtil.set(projectYuqingCountAvgKey, String.valueOf(avg));
}
@Override
public void countEmotionDistributionAvg(Long startTime, Long endTime, String projectId, String planId) throws IOException {
// 舆情总量
Long total = getYuqingAnalyzeCount(startTime, endTime, projectId, planId, null, null);
// 正面舆情
long positiveCount = getYuqingAnalyzeCount(startTime, endTime, projectId, planId, EmotionEnum.POSITIVE.getName(), null);
// 中性舆情
long neutralCount = getYuqingAnalyzeCount(startTime, endTime, projectId, planId, EmotionEnum.NEUTRAL.getName(), null);
// 负面舆情
long negativeCount = getYuqingAnalyzeCount(startTime, endTime, projectId, planId, EmotionEnum.NEGATIVE.getName(), null);
// key
String positiveKey = RedisUtil.getYuqingAnalyzeEmotionDistributionAvgKey(projectId, Constant.PRIMARY_CONTEND_ID, EmotionEnum.POSITIVE.getName(), planId);
String neutralKey = RedisUtil.getYuqingAnalyzeEmotionDistributionAvgKey(projectId, Constant.PRIMARY_CONTEND_ID, EmotionEnum.NEUTRAL.getName(), planId);
String negativeKey = RedisUtil.getYuqingAnalyzeEmotionDistributionAvgKey(projectId, Constant.PRIMARY_CONTEND_ID, EmotionEnum.NEGATIVE.getName(), planId);
// avg
double positiveAvg = Objects.isNull(total) || 0 == total ? 0d : positiveCount / (double) total;
double neutralAvg = Objects.isNull(total) || 0 == total ? 0d : neutralCount / (double) total;
double negativeAvg = Objects.isNull(total) || 0 == total ? 0d : negativeCount / (double) total;
redisUtil.set(positiveKey, String.valueOf(positiveAvg));
redisUtil.set(neutralKey, String.valueOf(neutralAvg));
redisUtil.set(negativeKey, String.valueOf(negativeAvg));
}
@Override
public void countImportantPlatformPercentageAvg(Long startTime, Long endTime, String projectId, String planId) throws IOException {
Long total = getYuqingAnalyzeCount(startTime, endTime, projectId, planId, null, Arrays.asList("微博", "微信", "抖音", "小红书", "今日头条", "网媒"));
String key = RedisUtil.getYuqingAnalyzePlatformAvgCountKey(projectId, Constant.PRIMARY_CONTEND_ID, planId);
int days = new Period(startTime, endTime, PeriodType.days()).getDays();
days = 0 == days ? 1 : days;
double avg = Objects.isNull(total) || 0 == total ? 0d : total / (double) days;
redisUtil.set(key, String.valueOf(avg));
}
@Override
public JSONObject getYuqingAmount(Long startTime, Long endTime, String planId) {
JSONObject jsonObject = new JSONObject();
try {
String projectId = UserThreadLocal.getProjectId();
// 舆情总量
long total = getYuqingAnalyzeCount(startTime, endTime, planId, EmotionEnum.ALL.getName());
jsonObject.put("total", total);
// 项目日均
String projectYuqingCountAvgKey = RedisUtil.getYuqingAnalyzeProjectAvgCountKey(projectId, Constant.PRIMARY_CONTEND_ID, planId);
double projectYuqingAvgCount = Objects.isNull(redisUtil.get(projectYuqingCountAvgKey)) ? 0d : Double.parseDouble(redisUtil.get(projectYuqingCountAvgKey));
jsonObject.put("projectAvg", projectYuqingAvgCount);
// 本次日均
int days = new Period(startTime, endTime, PeriodType.days()).getDays();
days = 0 == days ? 1 : days;
double yuqingAvgCount = total == 0 ? 0d : total / (double) days;
jsonObject.put("compare", compare(yuqingAvgCount, projectYuqingAvgCount));
}catch (Exception e){
ExceptionCast.cast(CommonCodeEnum.FAIL, "getYuqingAmount异常-", e);
}
return jsonObject;
}
@Override
public JSONObject getYuqingEmotionDistribution(Long startTime, Long endTime, String planId) {
JSONObject jsonObject = new JSONObject();
try {
String projectId = UserThreadLocal.getProjectId();
// 舆情总量
long total = getYuqingAnalyzeCount(startTime, endTime, planId, EmotionEnum.ALL.getName());
// 正面舆情
long positiveCount = getYuqingAnalyzeCount(startTime, endTime, planId, EmotionEnum.POSITIVE.getName());
// 中性舆情
long neutralCount = getYuqingAnalyzeCount(startTime, endTime, planId, EmotionEnum.NEUTRAL.getName());
// 负面舆情
long negativeCount = getYuqingAnalyzeCount(startTime, endTime, planId, EmotionEnum.NEGATIVE.getName());
// 各情感倾向占比
double positivePercentage = total == 0 ? 0d : positiveCount / (double) total;
double neutralPercentage = total == 0 ? 0d : neutralCount / (double) total;
double negativePercentage = total == 0 ? 0d : negativeCount / (double) total;
// 项目情感倾向占比均值
String positiveKey = RedisUtil.getYuqingAnalyzeEmotionDistributionAvgKey(projectId, Constant.PRIMARY_CONTEND_ID, EmotionEnum.POSITIVE.getName(), planId);
String neutralKey = RedisUtil.getYuqingAnalyzeEmotionDistributionAvgKey(projectId, Constant.PRIMARY_CONTEND_ID, EmotionEnum.NEUTRAL.getName(), planId);
String negativeKey = RedisUtil.getYuqingAnalyzeEmotionDistributionAvgKey(projectId, Constant.PRIMARY_CONTEND_ID, EmotionEnum.NEGATIVE.getName(), planId);
double positiveAvg = Objects.isNull(redisUtil.get(positiveKey)) ? 0d : Double.parseDouble(redisUtil.get(positiveKey));
double neutralAvg = Objects.isNull(redisUtil.get(neutralKey)) ? 0d : Double.parseDouble(redisUtil.get(neutralKey));
double negativeAvg = Objects.isNull(redisUtil.get(negativeKey)) ? 0d : Double.parseDouble(redisUtil.get(negativeKey));
JSONObject positive = new JSONObject();
JSONObject neutral = new JSONObject();
JSONObject negative = new JSONObject();
positive.put("positiveCount", positiveCount);
neutral.put("neutralCount", neutralCount);
negative.put("negativeCount", negativeCount);
positive.put("positivePercent", positivePercentage);
neutral.put("neutralPercent", neutralPercentage);
negative.put("negativePercent", negativePercentage);
positive.put("positiveAvg", positiveAvg);
neutral.put("neutralAvg", neutralAvg);
negative.put("negativeAvg", negativeAvg);
jsonObject.put("positive", positive);
jsonObject.put("neutral", neutral);
jsonObject.put("negative", negative);
}catch (Exception e){
ExceptionCast.cast(CommonCodeEnum.FAIL, "getYuqingEmotionDistribution异常-", e);
}
return jsonObject;
}
@Override
public JSONObject getImportantPlatformPercentage(Long startTime, Long endTime, String planId) {
JSONObject res = new JSONObject();
try {
List<JSONObject> list = new ArrayList<>();
List<String> importantPlatforms = Arrays.asList("微博", "微信", "抖音", "小红书", "今日头条", "网媒");
String projectId = UserThreadLocal.getProjectId();
Long total = getYuqingAnalyzeCount(startTime, endTime, planId, importantPlatforms);
// 平台聚合
SearchResponse searchResponse = platformAggSearchResponse(startTime, endTime, planId, importantPlatforms);
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
ParsedStringTerms teamAgg = (ParsedStringTerms) aggMap.get("platform_count");
List<? extends Terms.Bucket> buckets = teamAgg.getBuckets();
buckets.forEach(bucket -> {
JSONObject platformResult = new JSONObject();
platformResult.put("platform", GlobalPojo.getPlatformNameById(bucket.getKeyAsString()));
platformResult.put("count", bucket.getDocCount());
list.add(platformResult);
});
res.put("platformCount", list.stream().sorted(Comparator.comparingLong((JSONObject json) -> json.getLongValue("count")).reversed()));
JSONObject overview = new JSONObject();
// 总量
overview.put("total", total);
// 日均
int days = new Period(startTime, endTime, PeriodType.days()).getDays();
days = 0 == days ? 1 : days;
double avg = total == 0 ? 0d : total / (double) days;
overview.put("avg", avg);
// 项目均值
String key = RedisUtil.getYuqingAnalyzePlatformAvgCountKey(projectId, Constant.PRIMARY_CONTEND_ID, planId);
double projectAvg = Objects.isNull(redisUtil.get(key)) ? 0d : Double.parseDouble(redisUtil.get(key));
overview.put("compare", compare(avg, projectAvg));
res.put("overview", overview);
return res;
}catch (Exception e){
ExceptionCast.cast(CommonCodeEnum.FAIL, "新舆情分析getImportantPlatformPercentage异常-", e);
}
return res;
}
private String compare(double avg, double projectAvg){
// 本次查询日均数据 > 项目日均1.2倍为【本次偏高】,项目日均0.8倍<本次查询日均数据<项目日均1.2倍为【数据量正常】,本次查询日均数据 < 项目日均0.8倍为【本次偏低】
String compare = "normal";
double highSide = 1.2 * projectAvg;
double lowSize = 0.8 * projectAvg;
if (avg > highSide){
compare = "higher";
}
if (lowSize > avg){
compare = "lower";
}
return compare;
}
@Override
public List<JSONObject> getPlatformPercentage(Long startTime, Long endTime, String planId) {
try {
List<String> platforms = commonService.getQbjcPlatformNames();
List<JSONObject> list = new ArrayList<>();
// query
BoolQueryBuilder query = yuqingAnalyzeQuery(startTime, endTime, planId, null, platforms);
// total
Long total = esClientDao.count(query);
// 平台聚合
SearchResponse searchResponse = platformAggSearchResponse(startTime, endTime, planId, platforms);
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
ParsedStringTerms teamAgg = (ParsedStringTerms) aggMap.get("platform_count");
List<? extends Terms.Bucket> buckets = teamAgg.getBuckets();
buckets.forEach(bucket -> {
JSONObject platformResult = new JSONObject();
platformResult.put("platform", GlobalPojo.getPlatformNameById(bucket.getKeyAsString()));
platformResult.put("percent", 0 == total ? 0d : bucket.getDocCount() / (double) total);
list.add(platformResult);
});
// 取占比最高的前9
List<JSONObject> res = list.stream()
.sorted(Comparator.comparingDouble((JSONObject jsonObject) -> jsonObject.getDoubleValue("percent"))
.reversed()).limit(9).collect(Collectors.toList());
// 第10个算做其他平台
JSONObject other = new JSONObject();
other.put("platform", "其他");
other.put("percent", 1 - res.stream().mapToDouble(count -> count.getDoubleValue("percent")).sum());
res.add(other);
return res;
}catch (Exception e){
ExceptionCast.cast(CommonCodeEnum.FAIL, "新舆情分析getPlatformPercentage异常-", e);
}
return Collections.emptyList();
}
/**
* 平台聚合查询
* @param startTime
* @param endTime
* @param planId
* @param platforms
* @return
* @throws IOException
*/
private SearchResponse platformAggSearchResponse(Long startTime, Long endTime, String planId, List<String> platforms) throws IOException {
// 索引
String[] indexes = esClientDao.getIndexes();
// 聚合请求
TermsAggregationBuilder platformAggregationBuilder = AggregationBuilders.terms("platform_count").field("platform_id").order(BucketOrder.count(false));
// query
BoolQueryBuilder query = yuqingAnalyzeQuery(startTime, endTime, planId, null, platforms);
return esClientDao.searchResponse(indexes, null, query, platformAggregationBuilder, null, null, 0, 0, null);
}
@Override
public JSONObject getSpreadTendency(Long startTime, Long endTime, String planId) {
JSONObject res = new JSONObject();
try {
String projectId = UserThreadLocal.getProjectId();
// 各平台趋势图
Map<String, List<LineVO>> platformSpreadTendency = getPlatformSpreadTendency(startTime, endTime, planId);
// 总趋势图
Pair<String, List<LineVO>> spreadTendency = getSpreadTendency(startTime, endTime, planId, null);
// 负面趋势图
Pair<String, List<LineVO>> negativeSpreadTendency = getSpreadTendency(startTime, endTime, planId, EmotionEnum.NEGATIVE.getName());
Map<String, BaseMap> baseMaps = new HashMap<>(2);
CompletableFuture.allOf(Stream.of(spreadTendency.getLeft(), negativeSpreadTendency.getLeft()).map(aggTitle -> CompletableFuture.runAsync(() -> {
try {
baseMaps.put(aggTitle, getFirstArticle(startTime, endTime, aggTitle, projectId, Constant.PRIMARY_CONTEND_ID, planId, true));
} catch (IOException ignored) {
}
}, executor)).toArray(CompletableFuture[]::new)).join();
// 最高点
BaseMap baseMap = baseMaps.get(spreadTendency.getLeft());
JSONObject highestJson = new JSONObject();
highestJson.put("title", Objects.isNull(baseMap) ? null : baseMap.getTitle());
highestJson.put("url", Objects.isNull(baseMap) ? null : baseMap.getUrl());
BaseMap negativeBaseMap = baseMaps.get(negativeSpreadTendency.getLeft());
JSONObject negativeHighestJson = new JSONObject();
negativeHighestJson.put("title", Objects.isNull(negativeBaseMap) ? null : negativeBaseMap.getTitle());
negativeHighestJson.put("url", Objects.isNull(negativeBaseMap) ? null : negativeBaseMap.getUrl());
res.putAll(platformSpreadTendency);
res.put("总量", spreadTendency.getRight());
res.put("负面", negativeSpreadTendency.getRight());
res.put("highest", highestJson);
res.put("negativeHighest", negativeHighestJson);
}catch (Exception e){
ExceptionCast.cast(CommonCodeEnum.FAIL, "新舆情分析getSpreadTendency异常-", e);
}
return res;
}
@Override
public List<JSONObject> getActiveChannels(Long startTime, Long endTime, String planId) {
List<JSONObject> list = new ArrayList<>();
try {
// 索引
String[] indexes = esClientDao.getIndexes();
// query
BoolQueryBuilder query = yuqingAnalyzeQuery(startTime, endTime, planId, null, null);
// 渠道聚合,取前20
TermsAggregationBuilder sourceAggregationBuilder = AggregationBuilders.terms("source_count").field("source").order(BucketOrder.count(false)).size(20);
// 情感倾向子聚合
TermsAggregationBuilder emotionAggregationBuilder = AggregationBuilders.terms("emotion_count").field("brandkbs_mark_cache_maps.name.keyword");
SearchResponse searchResponse = esClientDao.searchResponse(indexes, null, query,
sourceAggregationBuilder.subAggregation(emotionAggregationBuilder), null, null, 0, 0, null);
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
ParsedStringTerms sourceCountTeam = (ParsedStringTerms) aggMap.get("source_count");
List<? extends Terms.Bucket> buckets = sourceCountTeam.getBuckets();
buckets.forEach(bucket -> {
JSONObject jsonObject = new JSONObject();
Map<String, Aggregation> map = bucket.getAggregations().asMap();
ParsedStringTerms countTeam = (ParsedStringTerms) map.get("emotion_count");
List<? extends Terms.Bucket> bucketList = countTeam.getBuckets();
jsonObject.put("source", bucket.getKeyAsString());
// 发文次数
jsonObject.put("count", bucket.getDocCount());
// 负面发文次数
AtomicLong negativeCount = new AtomicLong();
bucketList.forEach(data -> {
if (Objects.equals(data.getKeyAsString(), EmotionEnum.NEGATIVE.getName())) {
negativeCount.set(data.getDocCount());
}
});
jsonObject.put("negativeCount", negativeCount.get());
list.add(jsonObject);
});
}catch (Exception e){
ExceptionCast.cast(CommonCodeEnum.FAIL, "新舆情分析getActiveChannels异常-", e);
}
return list;
}
@Override
public List<JSONObject> getArticleIpLocated(Long startTime, Long endTime, String planId, int size) {
List<JSONObject> list = new ArrayList<>();
try {
// 索引
String[] indexes = esClientDao.getIndexes();
// query
BoolQueryBuilder query = yuqingAnalyzeQuery(startTime, endTime, planId, null, null);
// 聚合请求
TermsAggregationBuilder ipAggregationBuilder = AggregationBuilders.terms("ip_count").field("ip_location.keyword").order(BucketOrder.count(false)).size(size);
// 情感倾向子聚合
TermsAggregationBuilder emotionAggregationBuilder = AggregationBuilders.terms("emotion_count").field("brandkbs_mark_cache_maps.name.keyword");
SearchResponse searchResponse = esClientDao.searchResponse(indexes, null, query,
ipAggregationBuilder.subAggregation(emotionAggregationBuilder), null, null, 0, 0, null);
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
ParsedStringTerms sourceCountTeam = (ParsedStringTerms) aggMap.get("ip_count");
List<? extends Terms.Bucket> buckets = sourceCountTeam.getBuckets();
buckets.forEach(bucket -> {
JSONObject jsonObject = new JSONObject();
jsonObject.put("ip", bucket.getKeyAsString());
jsonObject.put("count", bucket.getDocCount());
// 取负面
Map<String, Aggregation> emotionAgg = bucket.getAggregations().asMap();
ParsedStringTerms emotionCountTeam = (ParsedStringTerms) emotionAgg.get("emotion_count");
List<? extends Terms.Bucket> emotionBuckets = emotionCountTeam.getBuckets();
Optional<? extends Terms.Bucket> negative = emotionBuckets.stream().filter(b -> Objects.equals(EmotionEnum.NEGATIVE.getName(), b.getKeyAsString())).findFirst();
jsonObject.put("negativeCount", negative.map(MultiBucketsAggregation.Bucket::getDocCount).orElse(0L));
list.add(jsonObject);
});
}catch (Exception e){
ExceptionCast.cast(CommonCodeEnum.FAIL, "新舆情分析getArticleIpLocated异常-", e);
}
return list;
}
@Override
public PageVO<MarkFlowEntity> getYuqingAnalyzeDetail(MarkSearchDTO dto) {
//设置默认的时间、页码、排序方式等
defaultMarkSearch(dto);
PageVO<MarkFlowEntity> yuqingMarkList;
if (Objects.isNull(dto.getPlanId())){
yuqingMarkList = getYuqingMarkList(dto);
}else {
yuqingMarkList = getNonManualMarkList(dto);
}
// 列表数据最小mgid
MarkFlowEntity markFlowEntity = yuqingMarkList.getList().stream()
.min(Comparator.comparingLong(entity -> JSONObject.parseObject(JSONObject.toJSONString(entity.getData())).getLongValue("mgid"))).orElse(null);
// yuqingMarkList.getInfo().put("stime", Objects.isNull(markFlowEntity) ? Long.MAX_VALUE : JSONObject.parseObject(JSONObject.toJSONString(markFlowEntity.getData())).getLongValue("stime"));
yuqingMarkList.getInfo().put("pageGid", Objects.isNull(markFlowEntity) ? Long.MAX_VALUE : JSONObject.parseObject(JSONObject.toJSONString(markFlowEntity.getData())).getLongValue("mgid"));
return yuqingMarkList;
}
@Override
public List<JSONObject> getHighWord(Long startTime, Long endTime, String planId, boolean cache) {
List<JSONObject> res = new ArrayList<>();
try {
String projectId = UserThreadLocal.getProjectId();
String redisKey = RedisUtil.getYuqingAnalyzeHighWordKey(projectId, Constant.PRIMARY_CONTEND_ID, planId, startTime, endTime);
String resultStr;
// 返回缓存
if (cache && StringUtils.isNotEmpty(resultStr = redisUtil.get(redisKey))) {
return JSONArray.parseArray(resultStr, JSONObject.class);
}
EsClientDao.SearchHelper searchHelper = EsClientDao.createSearchHelper();
// sort
searchHelper.setSort(SortBuilders.fieldSort("time").order(SortOrder.DESC));
// fetchSource
searchHelper.setFetchSource(new String[]{"ind_title", "ind_full_text", "c5", "foreign", "brandkbs_mark_cache_maps"});
// postFilter
BoolQueryBuilder postFilter = yuqingAnalyzeQuery(startTime, endTime, planId, null, null);
searchHelper.setPostFilter(postFilter);
searchHelper.setSize(10000);
List<String> textList = new ArrayList<>();
List<SearchResponse> searchResponses = Collections.singletonList(esClientDao.searchResponse(searchHelper));
for (SearchResponse searchResponse : searchResponses) {
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
Map<String, Object> source = searchHit.getSourceAsMap();
BaseMap baseMap = Tools.getBaseFromEsMap(source);
String title = baseMap.getTitle();
String content = baseMap.getContent();
textList.add(title + content);
}
}
res = textUtil.getHighWordsJsonDifferentFieldName(textList, 20);
redisUtil.setExpire(redisKey, JSONArray.toJSONString(res));
}catch (Exception e){
ExceptionCast.cast(CommonCodeEnum.FAIL, "新舆情分析getHighWord异常-", e);
}
return res;
}
@Override
public List<JSONObject> getLastNews(Long startTime, Long endTime, String planId, int size, boolean include) {
try {
String projectId = UserThreadLocal.getProjectId();
List<JSONObject> markTopTitleList = getMarkTopTitle(startTime, endTime, null, projectId, Constant.PRIMARY_CONTEND_ID, planId, size);
CompletableFuture.allOf(markTopTitleList.stream().map(json -> CompletableFuture.supplyAsync(() -> {
try {
BaseMap firstArticle = getFirstArticle(startTime, endTime, json.getString("title"), projectId, Constant.PRIMARY_CONTEND_ID, planId, include);
if (Objects.isNull(firstArticle)){
return null;
}
json.put("content", firstArticle.getContent());
json.put("url", firstArticle.getUrl());
json.put("realSource", firstArticle.getRealSource());
json.put("emotion", firstArticle.getEmotion());
json.put("time", firstArticle.getTime());
} catch (IOException ignored) {
}
return null;
}, executor)).toArray(CompletableFuture[]::new)).join();
return markTopTitleList.stream().filter(Objects::nonNull).limit(size).collect(Collectors.toList());
}catch (Exception e){
ExceptionCast.cast(CommonCodeEnum.FAIL, "新舆情分析getLastNews异常-", e);
}
return Collections.emptyList();
}
/**
* 微博、微信、抖音、小红书平台趋势图
* @param startTime
* @param endTime
* @param planId
* @return
* @throws IOException
*/
private Map<String, List<LineVO>> getPlatformSpreadTendency(Long startTime, Long endTime, String planId) throws IOException{
Map<String, List<LineVO>> res = new HashMap<>(4);
// 平台
List<String> platforms = Arrays.asList("微博", "微信", "抖音", "小红书");
String[] indexes = esClientDao.getIndexes();
// 聚合请求
TermsAggregationBuilder platformAggregationBuilder = AggregationBuilders.terms("platformAgg").field("platform_id").order(BucketOrder.count(false));
DateHistogramAggregationBuilder daysAggregationBuilder;
// 天级以小时为颗粒度,其他以天作为颗粒度
if (endTime - startTime <= Constant.ONE_DAY){
daysAggregationBuilder = AggregationBuilders.dateHistogram("timeAgg").field("time").calendarInterval(DateHistogramInterval.HOUR);
}else {
daysAggregationBuilder = AggregationBuilders.dateHistogram("timeAgg").field("time").calendarInterval(DateHistogramInterval.DAY);
}
// query
BoolQueryBuilder query = yuqingAnalyzeQuery(startTime, endTime, planId, null, platforms);
// response
SearchResponse searchResponse = esClientDao.searchResponse(indexes, null, query,
platformAggregationBuilder.subAggregation(daysAggregationBuilder), null, null, 0, 0, null);
Map<String, Aggregation> map = searchResponse.getAggregations().asMap();
ParsedStringTerms countTeam = (ParsedStringTerms) map.get("platformAgg");
List<? extends Terms.Bucket> platformBuckets = countTeam.getBuckets();
platformBuckets.forEach(bucket -> {
List<LineVO> line = new ArrayList<>();
String platformId = bucket.getKeyAsString();
Map<String, Aggregation> aggMap = bucket.getAggregations().asMap();
ParsedDateHistogram teamAgg = (ParsedDateHistogram) aggMap.get("timeAgg");
List<? extends Histogram.Bucket> buckets = teamAgg.getBuckets();
for (Histogram.Bucket timeBucket : buckets) {
long time = Long.parseLong(timeBucket.getKeyAsString());
long count = timeBucket.getDocCount();
line.add(new LineVO(count, time));
}
res.put(GlobalPojo.getPlatformNameById(platformId), line);
});
return res;
}
/**
* 新舆情分析-舆情趋势图
* @param startTime
* @param endTime
* @param planId
* @param emotion
* @return
* @throws IOException
*/
private Pair<String, List<LineVO>> getSpreadTendency(Long startTime, Long endTime, String planId, String emotion) throws IOException {
List<LineVO> res = new ArrayList<>();
String[] indexes = esClientDao.getIndexes();
DateHistogramAggregationBuilder daysAggregationBuilder;
// 天级以小时为颗粒度,其他以天作为颗粒度
if (endTime - startTime <= Constant.ONE_DAY){
daysAggregationBuilder = AggregationBuilders.dateHistogram("timeAgg").field("time").calendarInterval(DateHistogramInterval.HOUR);
}else {
daysAggregationBuilder = AggregationBuilders.dateHistogram("timeAgg").field("time").calendarInterval(DateHistogramInterval.DAY);
}
TermsAggregationBuilder titleAggregationBuilder = AggregationBuilders.terms("titleAgg").field("agg_title.keyword").order(BucketOrder.count(false));
// query
BoolQueryBuilder query = yuqingAnalyzeQuery(startTime, endTime, planId, emotion, null);
// response
SearchResponse searchResponse = esClientDao.searchResponse(indexes, null, query,
daysAggregationBuilder.subAggregation(titleAggregationBuilder), null, null, 0, 0, null);
Map<String, Aggregation> aggMap = searchResponse.getAggregations().asMap();
ParsedDateHistogram teamAgg = (ParsedDateHistogram) aggMap.get("timeAgg");
List<? extends Histogram.Bucket> buckets = teamAgg.getBuckets();
// 走势图
buckets.forEach(bucket -> {
long time = Long.parseLong(bucket.getKeyAsString());
long count = bucket.getDocCount();
res.add(new LineVO(count, time));
});
// 走势图最高点的聚合标题
String maxAggTitle = "";
Optional<? extends Histogram.Bucket> maxBucket = buckets.stream().max(Comparator.comparingLong(MultiBucketsAggregation.Bucket::getDocCount));
if (maxBucket.isPresent()){
Map<String, Aggregation> map = maxBucket.get().getAggregations().asMap();
ParsedStringTerms countTeam = (ParsedStringTerms) map.get("titleAgg");
List<? extends Terms.Bucket> titleBuckets = countTeam.getBuckets();
// 没有拿到聚合标题,说明此时发文的平台全为微博
maxAggTitle = CollectionUtils.isEmpty(titleBuckets) ? "" : titleBuckets.get(0).getKeyAsString();
}
return Pair.of(maxAggTitle, res);
}
private Long getYuqingAnalyzeCount(Long startTime, Long endTime, String planId, List<String> platforms) throws IOException {
BoolQueryBuilder query = yuqingAnalyzeQuery(startTime, endTime, planId, null, platforms);
return esClientDao.count(query);
}
private Long getYuqingAnalyzeCount(Long startTime, Long endTime, String planId, String emotion) throws IOException {
BoolQueryBuilder query = yuqingAnalyzeQuery(startTime, endTime, planId, emotion, null);
return esClientDao.count(query);
}
private Long getYuqingAnalyzeCount(Long startTime, Long endTime, String projectId, String planId, String emotion, List<String> platforms) throws IOException {
BoolQueryBuilder query = yuqingAnalyzeQuery(startTime, endTime, projectId, planId, emotion, platforms);
return esClientDao.count(query);
}
private BoolQueryBuilder yuqingAnalyzeQuery(Long startTime, Long endTime, String planId, String emotion, List<String> platforms){
return yuqingAnalyzeQuery(startTime, endTime, null, planId, emotion, platforms);
}
private BoolQueryBuilder yuqingAnalyzeQuery(Long startTime, Long endTime, String projectId, String planId, String emotion, List<String> platforms){
if (Objects.isNull(projectId)) {
projectId = UserThreadLocal.getProjectId();
}
// project plan query
BoolQueryBuilder query = EsQueryTools.assembleCacheMapsPlanQuery(projectId, planId);
if (Objects.isNull(planId)){
query = projectContendIdQuery(projectId, Constant.PRIMARY_CONTEND_ID);
}
// emotion
if (StringUtils.isNotEmpty(emotion) && !EmotionEnum.ALL.getName().equals(emotion)) {
query.must(QueryBuilders.termQuery("brandkbs_mark_cache_maps.name.keyword", emotion));
}
// platform
if (CollectionUtils.isNotEmpty(platforms) && Objects.nonNull(platforms.get(0))){
List<String> platformIds = new ArrayList<>(6);
for (String platform : platforms) {
String platformId = GlobalPojo.getPlatformIdByName(platform);
platformIds.add(platformId);
}
query.must(QueryBuilders.termsQuery("platform_id", platformIds));
}
// startTime
if (Objects.nonNull(startTime)) {
query.must(QueryBuilders.rangeQuery("time").gte(startTime));
}
// endTime
if (Objects.nonNull(endTime)) {
query.must(QueryBuilders.rangeQuery("time").lt(endTime));
}
return query;
}
/**
......
......@@ -3,14 +3,13 @@ package com.zhiwei.brandkbs2.service.impl;
import com.zhiwei.brandkbs2.auth.UserThreadLocal;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.AggreeResultDao;
import com.zhiwei.brandkbs2.dao.BrandkbsTaskDao;
import com.zhiwei.brandkbs2.dao.ChannelDao;
import com.zhiwei.brandkbs2.dao.ReportSettingsDao;
import com.zhiwei.brandkbs2.dao.*;
import com.zhiwei.brandkbs2.enmus.ReportTypeEnum;
import com.zhiwei.brandkbs2.es.ChannelEsDao;
import com.zhiwei.brandkbs2.es.EsClientDao;
import com.zhiwei.brandkbs2.exception.ExceptionCast;
import com.zhiwei.brandkbs2.listener.ApplicationProjectListener;
import com.zhiwei.brandkbs2.model.CommonCodeEnum;
import com.zhiwei.brandkbs2.pojo.*;
import com.zhiwei.brandkbs2.service.*;
import com.zhiwei.brandkbs2.util.Tools;
......@@ -19,6 +18,8 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
......@@ -28,6 +29,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
......@@ -61,6 +63,9 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "aggreeResultDaoImpl")
AggreeResultDao aggreeResultDao;
@Resource(name = "nonManualProjectPlanDao")
NonManualProjectPlanDao nonManualProjectPlanDao;
@Resource(name = "brandkbsTaskServiceImpl")
BrandkbsTaskService brandkbsTaskService;
......@@ -82,6 +87,9 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "customEventServiceImpl")
CustomEventService customEventService;
@Resource(name = "markDataServiceImpl")
MarkDataService markDataService;
@Resource(name = "taskServiceExecutor")
ThreadPoolTaskExecutor taskServiceExecutor;
......@@ -324,6 +332,56 @@ public class TaskServiceImpl implements TaskService {
}
}
@Override
public void calculateProjectAvg() {
AtomicLong total = new AtomicLong();
GlobalPojo.PROJECT_MAP.forEach((projectId, project) ->{
try {
// 取近一年
long endTime = System.currentTimeMillis();
long startTime = endTime - Constant.ONE_YEAR;
// 项目创建时间在近一年内则取项目创建时间
startTime = project.getCTime() > startTime ? project.getCTime() : startTime;
// 同时计算非人工项目各方案均值
if (!project.isManual()) {
List<NonManualProjectPlan> plans = nonManualProjectPlanDao.findList(new Query(Criteria.where("projectId").is(projectId)));
for (NonManualProjectPlan plan : plans) {
markDataService.countYuqingAmountAvg(startTime, endTime, projectId, plan.getId());
markDataService.countEmotionDistributionAvg(startTime, endTime, projectId, plan.getId());
markDataService.countImportantPlatformPercentageAvg(startTime, endTime, projectId, plan.getId());
}
}
markDataService.countYuqingAmountAvg(startTime, endTime, projectId, null);
markDataService.countEmotionDistributionAvg(startTime, endTime, projectId, null);
markDataService.countImportantPlatformPercentageAvg(startTime, endTime, projectId, null);
log.info("项目:{}-均值计算已完成:{}个", project.getProjectName(), total.incrementAndGet());
}catch (Exception e){
ExceptionCast.cast(CommonCodeEnum.FAIL, "calculateProjectAvg异常-projectId:" + projectId, e);
}
});
}
@Override
public void yuqingAnalyzeHighWordCache() {
AtomicInteger total = new AtomicInteger();
Long[] time = commonService.getTimeRangeMonth();
List<CompletableFuture<Object>> projectFutures = GlobalPojo.PROJECT_MAP.values().stream().map(project -> CompletableFuture.supplyAsync(() -> {
UserThreadLocal.set(new UserInfo().setProjectId(project.getId()));
markDataService.getHighWord(time[0], time[1], null, false);
log.info("项目:{}-{}-词云缓存已完成:{}个", project.getProjectName(), project.getId(), total.incrementAndGet());
return null;
}, cacheServiceExecutor)).collect(Collectors.toList());
AtomicInteger total2 = new AtomicInteger();
List<CompletableFuture<Object>> planFutures = nonManualProjectPlanDao.findList(new Query()).stream().map(plan -> CompletableFuture.supplyAsync(() -> {
UserThreadLocal.set(new UserInfo().setProjectId(plan.getProjectId()));
markDataService.getHighWord(time[0], time[1], plan.getId(), false);
log.info("方案:{}-{}-词云缓存已完成:{}个", plan.getName(), plan.getId(), total2.incrementAndGet());
return null;
}, cacheServiceExecutor)).collect(Collectors.toList());
projectFutures.addAll(planFutures);
CompletableFuture.allOf(projectFutures.toArray(new CompletableFuture[0])).join();
}
private boolean reportSendByProject(Project project) {
boolean flag = false;
// 扫描setting信息并生成对应报告
......
......@@ -45,6 +45,7 @@ public class ControlCenter {
try {
taskService.messageFlowCache();
taskService.customEventCache();
taskService.yuqingAnalyzeHighWordCache();
} catch (Exception e) {
log.error("定时按天缓存数据-出错", e);
} finally {
......@@ -91,4 +92,16 @@ public class ControlCenter {
}
}
@Async("scheduledExecutor")
@Scheduled(cron = "0 0 3 ? * SUN")
public void calculateProjectAvg() {
log.info("定时每周计算项目均值-启动");
try {
taskService.calculateProjectAvg();
} catch (Exception e) {
log.error("定时按周计算项目均值-出错", e);
} finally {
log.info("定时按周计算项目均值-结束");
}
}
}
......@@ -7,6 +7,7 @@ import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
......@@ -82,6 +83,34 @@ public class RedisUtil {
return RedisKeyPrefix.NON_MANUAL_PROJECT_MARK_MAX_GID + Tools.concat(projectId, planId, userId);
}
public static String getYuqingAnalyzeProjectAvgCountKey(String projectId, String contendId, String planId){
if (Objects.isNull(planId)){
return RedisKeyPrefix.YUQING_ANALYZE_PROJECT_AVG_COUNT + Tools.concat(projectId, contendId);
}
return RedisKeyPrefix.YUQING_ANALYZE_PROJECT_AVG_COUNT + Tools.concat(projectId, contendId, planId);
}
public static String getYuqingAnalyzeEmotionDistributionAvgKey(String projectId, String contendId, String emotion, String planId){
if (Objects.isNull(planId)){
return RedisKeyPrefix.YUQING_ANALYZE_EMOTION_DISTRIBUTION_AVG + Tools.concat(projectId, contendId, emotion);
}
return RedisKeyPrefix.YUQING_ANALYZE_EMOTION_DISTRIBUTION_AVG + Tools.concat(projectId, contendId, emotion, planId);
}
public static String getYuqingAnalyzePlatformAvgCountKey(String projectId, String contendId, String planId){
if (Objects.isNull(planId)){
return RedisKeyPrefix.YUQING_ANALYZE_PLATFORM_AVG_COUNT + Tools.concat(projectId, contendId);
}
return RedisKeyPrefix.YUQING_ANALYZE_PLATFORM_AVG_COUNT + Tools.concat(projectId, contendId, planId);
}
public static String getYuqingAnalyzeHighWordKey(String projectId, String contendId, String planId, Long startTime, Long endTime){
if (Objects.isNull(planId)){
return RedisKeyPrefix.YUQING_ANALYZE_HIGH_WORD + Tools.concat(projectId, contendId, startTime, endTime);
}
return RedisKeyPrefix.YUQING_ANALYZE_HIGH_WORD + Tools.concat(projectId, contendId, planId, startTime, endTime);
}
public void setExpire(String key, String value, long timeout, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, value, timeout, unit);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment