Commit 6cb85a2d by shenjunjie

Merge branch 'feature' into 'release'

2022/10/17 17:30

See merge request !55
parents 33a26549 6d2ac707
......@@ -27,6 +27,10 @@ public class RedisKeyPrefix {
public static final String INDEX_SPREAD = "BRANDKBS:INDEX:SPREAD:";
public static final String INDEX_COMPARE_SUMMARY_MOBILE = "BRANDKBS:INDEX:COMPARE_SUMMARY:MOBILE:";
/**
* 渠道榜单缓存
*/
public static final String CHANNEL_RECORD_LIST = "BRANDKBS:CHANNEL_RECORD:LIST:";
/**
* 竞品库-获取竞品对比舆情解读数据(PC
*/
public static final String INDEX_COMPARE_SUMMARY_PC = "BRANDKBS:CONTEND:COMPARE_SUMMARY:PC:";
......
......@@ -48,9 +48,7 @@ public class TaskPoolConfig {
// 配置核心线程数
executor.setCorePoolSize(8);
// 配置最大线程数
executor.setMaxPoolSize(8);
// 配置队列大小
executor.setQueueCapacity(16);
executor.setMaxPoolSize(16);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("esSearch-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
......@@ -84,7 +82,7 @@ public class TaskPoolConfig {
log.info("start taskServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(16);
executor.setCorePoolSize(32);
// 配置最大线程数
executor.setMaxPoolSize(32);
// 配置线程池中的线程的名称前缀
......@@ -97,4 +95,24 @@ public class TaskPoolConfig {
return executor;
}
@Bean
public ThreadPoolTaskExecutor cacheServiceExecutor() {
log.info("start cacheServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(2);
// 配置最大线程数
executor.setMaxPoolSize(2);
// 配置队列大小
executor.setQueueCapacity(10);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("cacheServiceExecutor-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
}
......@@ -48,7 +48,7 @@ public class AppChannelController extends BaseController {
@RequestParam(value = "startTime", required = false) Long startTime,
@RequestParam(value = "endTime", required = false) Long endTime,
@RequestParam(value = "pageSize", defaultValue = "50") int size) {
return ResponseResult.success(channelService.getActiveChannelList(contendId, platform, keyword, startTime, endTime, size));
return ResponseResult.success(channelService.getActiveChannelList(contendId, platform, keyword, startTime, endTime, size, true));
}
@ApiOperation("渠道库-友好渠道榜")
......@@ -69,7 +69,7 @@ public class AppChannelController extends BaseController {
@RequestParam(value = "startTime", required = false) Long startTime,
@RequestParam(value = "endTime", required = false) Long endTime,
@RequestParam(value = "pageSize", defaultValue = "50") int size) {
return ResponseResult.success(channelService.getPositiveList(contendId, platform, keyword, sorter, startTime, endTime, size));
return ResponseResult.success(channelService.getPositiveList(contendId, platform, keyword, sorter, startTime, endTime, size, true));
}
@ApiOperation("渠道库-敏感渠道榜")
......@@ -90,7 +90,7 @@ public class AppChannelController extends BaseController {
@RequestParam(value = "startTime", required = false) Long startTime,
@RequestParam(value = "endTime", required = false) Long endTime,
@RequestParam(value = "pageSize", defaultValue = "50") int size) {
return ResponseResult.success(channelService.getNegativeList(contendId, platform, keyword, sorter, startTime, endTime, size));
return ResponseResult.success(channelService.getNegativeList(contendId, platform, keyword, sorter, startTime, endTime, size, true));
}
@ApiOperation("渠道库-收藏渠道")
......
......@@ -168,7 +168,7 @@ public class AppSearchController extends BaseController {
@GetMapping("channel/list/active")
public ResponseResult getActiveChannelList(@RequestParam(value = "size", defaultValue = "2") int size) {
Long[] timeRangeDay = commonService.getTimeRangeDay();
return ResponseResult.success(channelService.getActiveChannelList("0", null, null, timeRangeDay[0], timeRangeDay[1], size));
return ResponseResult.success(channelService.getActiveChannelList("0", null, null, timeRangeDay[0], timeRangeDay[1], size, true));
}
@ApiOperation("搜索-渠道搜索条件")
......
......@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.dao.BaseMongoDao;
import com.zhiwei.brandkbs2.pojo.AbstractBaseMongo;
import org.apache.commons.lang3.StringUtils;
import org.bson.BsonRegularExpression;
import org.bson.Document;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate;
......@@ -17,7 +18,6 @@ import java.lang.reflect.ParameterizedType;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
/**
* @ClassName: BaseMongoDaoImpl
......@@ -157,11 +157,11 @@ public class BaseMongoDaoImpl<T extends AbstractBaseMongo> implements BaseMongoD
if (StringUtils.isEmpty(keyword)) {
return;
}
Pattern pattern = Pattern.compile("^.*" + keyword + ".*$", Pattern.CASE_INSENSITIVE);
Criteria regex = Criteria.where(fuzzKeys[0]).regex(pattern);
// Pattern pattern = Pattern.compile("^.*" + keyword + ".*$", Pattern.CASE_INSENSITIVE);
Criteria regex = Criteria.where(fuzzKeys[0]).regex(new BsonRegularExpression(keyword, "i"));
for (int i = 1; i < fuzzKeys.length; i++) {
// 多字段模糊查询
regex.orOperator(Criteria.where(fuzzKeys[i]).regex(pattern));
regex.orOperator(Criteria.where(fuzzKeys[i]).regex(new BsonRegularExpression(keyword, "i")));
}
query.addCriteria(regex);
}
......
......@@ -148,6 +148,18 @@ public class EsQueryTools {
return channelBoolQueryBuilder;
}
public static BoolQueryBuilder assembleChannelSourceQuery(String sourceKeyword) {
BoolQueryBuilder channelBoolQueryBuilder = QueryBuilders.boolQuery();
String[] keys = sourceKeyword.trim().split("\\|");
for (String key : keys) {
String channelRegex = getAllRegex(key);
BoolQueryBuilder keyQueryBuilder = QueryBuilders.boolQuery();
keyQueryBuilder.must(QueryBuilders.regexpQuery("source.keyword", ".*" + channelRegex + ".*"));
channelBoolQueryBuilder.should(keyQueryBuilder);
}
return channelBoolQueryBuilder;
}
public static void platformMustNot(BoolQueryBuilder boolQueryBuilder, String... platformNames) {
if (null == platformNames) {
return;
......
......@@ -122,7 +122,7 @@ public interface ChannelService {
*
* @return ChannelListVO
*/
List<ChannelListVO> getActiveChannelList(String contendId, String platform, String keyword, Long startTime, Long endTime, int size);
List<ChannelListVO> getActiveChannelList(String contendId, String platform, String keyword, Long startTime, Long endTime, int size,boolean cache);
/**
* 获取友好渠道榜
......@@ -134,7 +134,7 @@ public interface ChannelService {
* @param size
* @return
*/
List<ChannelListVO> getPositiveList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size);
List<ChannelListVO> getPositiveList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size,boolean cache);
/**
* 获取敏感渠道榜
......@@ -146,7 +146,7 @@ public interface ChannelService {
* @param size
* @return
*/
List<ChannelListVO> getNegativeList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size);
List<ChannelListVO> getNegativeList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size,boolean cache);
/**
* 收藏渠道
......
......@@ -14,6 +14,16 @@ public interface TaskService{
void messageFlowCount(int day);
/**
* 渠道榜单缓存(必须在统计之后进行)
*/
void channelRecordListCache();
/**
* 消息流缓存
*/
void messageFlowCache();
/**
* 生成简报任务并推送
*/
void generateReportAndSend();
......
package com.zhiwei.brandkbs2.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
......@@ -29,6 +30,7 @@ import com.zhiwei.brandkbs2.service.CommonService;
import com.zhiwei.brandkbs2.service.MarkDataService;
import com.zhiwei.brandkbs2.service.ProjectService;
import com.zhiwei.brandkbs2.util.MongoUtil;
import com.zhiwei.brandkbs2.util.RedisUtil;
import com.zhiwei.brandkbs2.util.Tools;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -106,6 +108,9 @@ public class ChannelServiceImpl implements ChannelService {
@Resource(name = "mongoUtil")
MongoUtil mongoUtil;
@Resource(name = "redisUtil")
RedisUtil redisUtil;
@Resource(name = "esSearchExecutor")
ThreadPoolTaskExecutor esSearchExecutor;
......@@ -296,8 +301,8 @@ public class ChannelServiceImpl implements ChannelService {
}
@Override
public List<ChannelListVO> getActiveChannelList(String contendId, String platform, String keyword, Long startTime, Long endTime, int size) {
return getEmotionList(contendId, platform, keyword, null, startTime, endTime, size, EmotionEnum.ALL.getState());
public List<ChannelListVO> getActiveChannelList(String contendId, String platform, String keyword, Long startTime, Long endTime, int size, boolean cache) {
return getEmotionList(contendId, platform, keyword, null, startTime, endTime, size, EmotionEnum.ALL.getState(), cache);
}
@Deprecated
......@@ -347,20 +352,28 @@ public class ChannelServiceImpl implements ChannelService {
}
@Override
public List<ChannelListVO> getPositiveList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size) {
return getEmotionList(contendId, platform, keyword, sorter, startTime, endTime, size, EmotionEnum.POSITIVE.getState());
public List<ChannelListVO> getPositiveList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size, boolean cache) {
return getEmotionList(contendId, platform, keyword, sorter, startTime, endTime, size, EmotionEnum.POSITIVE.getState(), cache);
}
@Override
public List<ChannelListVO> getNegativeList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size) {
return getEmotionList(contendId, platform, keyword, sorter, startTime, endTime, size, EmotionEnum.NEGATIVE.getState());
public List<ChannelListVO> getNegativeList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size,
boolean cache) {
return getEmotionList(contendId, platform, keyword, sorter, startTime, endTime, size, EmotionEnum.NEGATIVE.getState(), cache);
}
private List<ChannelListVO> getEmotionList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size, int emotion) {
private List<ChannelListVO> getEmotionList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size,
int emotion, boolean cache) {
List<ChannelListVO> resList = new ArrayList<>();
try {
Map<String, Pair<Long, ChannelRecord>> keyMap = new HashMap<>();
String projectId = UserThreadLocal.getProjectId();
String redisKey = RedisUtil.getChannelRecordList(projectId, startTime, endTime);
String resultStr;
// 返回缓存
if (cache && StringUtils.isNotEmpty(resultStr = redisUtil.get(redisKey))) {
return JSON.parseArray(resultStr, ChannelListVO.class);
}
Map<String, Pair<Long, ChannelRecord>> keyMap = new HashMap<>();
EsClientDao.SearchHelper searchHelper = createSearchHelperByChannelCriteria(projectId, null, Collections.singleton(contendId), platform, keyword, startTime, endTime);
// 分页查询所有结果
List<SearchResponse> searchResponses = channelEsDao.searchScrollResponse(searchHelper);
......@@ -431,6 +444,8 @@ public class ChannelServiceImpl implements ChannelService {
resList.add(ChannelListVO.createFromChannel(record, record.getRecord().getArticles().size()));
}
});
// TODO 配合天级缓存开启
// redisUtil.set(redisKey, JSON.toJSONString(resList));
} catch (IOException e) {
ExceptionCast.cast(CommonCodeEnum.FAIL, "es查询异常");
}
......@@ -1402,7 +1417,7 @@ public class ChannelServiceImpl implements ChannelService {
}
// keyword
if (StringUtils.isNotEmpty(keyword)) {
query.must(EsQueryTools.assembleSourceQuery(keyword));
query.must(EsQueryTools.assembleChannelSourceQuery(keyword));
}
// timeRange
// 默认搜索一周
......
......@@ -1466,7 +1466,7 @@ public class MarkDataServiceImpl implements MarkDataService {
searchHelper.setSize(1);
SearchHits hits = esClientDao.searchResponse(searchHelper).getHits();
if (0 == hits.getHits().length) {
log.info("getYuqingMarkFirstTime-debug:{}", searchHelper.getQuery());
log.info("getYuqingMarkFirstTime-debug:{}", searchHelper.getPostFilter());
return 1609430400000L;
}
return Long.parseLong(esClientDao.searchResponse(searchHelper).getHits().getAt(0).getSourceAsMap().get("time") + "");
......
package com.zhiwei.brandkbs2.service.impl;
import com.zhiwei.brandkbs2.auth.UserThreadLocal;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.AggreeResultDao;
import com.zhiwei.brandkbs2.dao.BrandkbsTaskDao;
......@@ -20,10 +22,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
......@@ -70,9 +69,18 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "reportServiceImpl")
ReportService reportService;
@Resource(name = "indexServiceImpl")
IndexService indexService;
@Resource(name = "commonServiceImpl")
CommonService commonService;
@Resource(name = "taskServiceExecutor")
ThreadPoolTaskExecutor taskServiceExecutor;
@Resource(name = "cacheServiceExecutor")
ThreadPoolTaskExecutor cacheServiceExecutor;
@Override
public void messageFlowCount(int day) {
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day);
......@@ -110,6 +118,47 @@ public class TaskServiceImpl implements TaskService {
log.info("渠道统计-渠道记录-统计结束");
}
@Override
public void channelRecordListCache() {
AtomicInteger total = new AtomicInteger();
List<Long[]> timeList = Arrays.asList(commonService.getTimeRangeDay(), commonService.getTimeRangeWeek(), commonService.getTimeRangeMonth());
String sorter = "{\"index\":\"descend\"}";
CompletableFuture.allOf(GlobalPojo.PROJECT_MAP.values().stream().map(project -> CompletableFuture.supplyAsync(() -> {
UserThreadLocal.set(new UserInfo().setProjectId(project.getId()));
timeList.forEach(times -> {
// 活跃渠道榜
channelService.getActiveChannelList(Constant.PRIMARY_CONTEND_ID, null, null, times[0], times[1], 50, false);
// 友好渠道榜
channelService.getPositiveList(Constant.PRIMARY_CONTEND_ID, null, null, sorter, times[0], times[1], 50, false);
// 敏感渠道榜
channelService.getNegativeList(Constant.PRIMARY_CONTEND_ID, null, null, sorter, times[0], times[1], 50, false);
});
log.info("项目:{}-渠道榜单缓存已完成:{}个", project.getProjectName(), total.incrementAndGet());
return null;
}, cacheServiceExecutor)).toArray(CompletableFuture[]::new)).join();
}
@Override
public void messageFlowCache() {
AtomicInteger total = new AtomicInteger();
CompletableFuture.allOf(GlobalPojo.PROJECT_MAP.values().stream().map(project -> CompletableFuture.supplyAsync(() -> {
UserThreadLocal.set(new UserInfo().setProjectId(project.getId()));
// 首页-舆情总量
indexService.getYuqingAmount(null, null, false);
// 首页-美誉度
indexService.getReputation(null, null, false);
// 首页-事件数
indexService.getEventAmount(null, null, false);
// 首页-平台贡献信息
indexService.getPlatformInfo(null, null, false);
// 首页-传播趋势
indexService.getSpreadingTend(null, null, false);
log.info("项目:{}-首页缓存已完成:{}个", project.getProjectName(), total.incrementAndGet());
return null;
}, cacheServiceExecutor)).toArray(CompletableFuture[]::new)).join();
}
private List<Channel> batchHandle(List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList, Map<ChannelIndex, ChannelIndex.Record> newRecordMap) {
List<Channel> insertList = Collections.synchronizedList(new ArrayList<>());
List<String> fids = batchList.stream().map(channelIndexRecordEntry -> channelIndexRecordEntry.getKey().getFid()).collect(Collectors.toList());
......
......@@ -17,7 +17,7 @@ import javax.annotation.Resource;
* @date: 2022-06-08 17:39
*/
@Component
@Profile({"dev", "pro"})
@Profile({"prod"})
public class ControlCenter {
public static final Logger log = LogManager.getLogger(ControlCenter.class);
......@@ -30,6 +30,7 @@ public class ControlCenter {
log.info("定时按天录入渠道进量-启动");
try {
taskService.messageFlowCount(1);
// taskService.channelRecordListCache();
} catch (Exception e) {
log.error("定时按天录入渠道进量-出错", e);
} finally {
......@@ -38,6 +39,19 @@ public class ControlCenter {
}
@Async("scheduledExecutor")
@Scheduled(cron = "0 0 1 * * ?")
public void messageFlowCache() {
log.info("定时按天缓存消息流信息-启动");
try {
taskService.messageFlowCache();
} catch (Exception e) {
log.error("定时按天缓存消息流信息-出错", e);
} finally {
log.info("定时按天缓存消息流信息-结束");
}
}
@Async("scheduledExecutor")
@Scheduled(cron = "0 0 * * * ? ")
public void reportScanning() {
log.info("每小时扫描简报信息-启动");
......
......@@ -50,6 +50,10 @@ public class RedisUtil {
public static String getIndexSpread(String projectId, Long startTime, Long endTime) {
return RedisKeyPrefix.INDEX_SPREAD + Tools.concat(projectId, startTime, endTime);
}
public static String getChannelRecordList(String projectId, Long startTime, Long endTime) {
return RedisKeyPrefix.CHANNEL_RECORD_LIST + Tools.concat(projectId, startTime, endTime);
}
public void setExpire(String key, String value, long timeout, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, value, timeout, unit);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment