Commit 07eac5fe by shenjunjie

Merge branch 'release' into 'master'

Release

See merge request !56
parents bd8eed48 6cb85a2d
...@@ -27,6 +27,10 @@ public class RedisKeyPrefix { ...@@ -27,6 +27,10 @@ public class RedisKeyPrefix {
public static final String INDEX_SPREAD = "BRANDKBS:INDEX:SPREAD:"; public static final String INDEX_SPREAD = "BRANDKBS:INDEX:SPREAD:";
public static final String INDEX_COMPARE_SUMMARY_MOBILE = "BRANDKBS:INDEX:COMPARE_SUMMARY:MOBILE:"; public static final String INDEX_COMPARE_SUMMARY_MOBILE = "BRANDKBS:INDEX:COMPARE_SUMMARY:MOBILE:";
/** /**
* 渠道榜单缓存
*/
public static final String CHANNEL_RECORD_LIST = "BRANDKBS:CHANNEL_RECORD:LIST:";
/**
* 竞品库-获取竞品对比舆情解读数据(PC * 竞品库-获取竞品对比舆情解读数据(PC
*/ */
public static final String INDEX_COMPARE_SUMMARY_PC = "BRANDKBS:CONTEND:COMPARE_SUMMARY:PC:"; public static final String INDEX_COMPARE_SUMMARY_PC = "BRANDKBS:CONTEND:COMPARE_SUMMARY:PC:";
......
...@@ -48,9 +48,7 @@ public class TaskPoolConfig { ...@@ -48,9 +48,7 @@ public class TaskPoolConfig {
// 配置核心线程数 // 配置核心线程数
executor.setCorePoolSize(8); executor.setCorePoolSize(8);
// 配置最大线程数 // 配置最大线程数
executor.setMaxPoolSize(8); executor.setMaxPoolSize(16);
// 配置队列大小
executor.setQueueCapacity(16);
// 配置线程池中的线程的名称前缀 // 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("esSearch-"); executor.setThreadNamePrefix("esSearch-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务 // rejection-policy:当pool已经达到max size的时候,如何处理新任务
...@@ -84,7 +82,7 @@ public class TaskPoolConfig { ...@@ -84,7 +82,7 @@ public class TaskPoolConfig {
log.info("start taskServiceExecutor"); log.info("start taskServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数 // 配置核心线程数
executor.setCorePoolSize(16); executor.setCorePoolSize(32);
// 配置最大线程数 // 配置最大线程数
executor.setMaxPoolSize(32); executor.setMaxPoolSize(32);
// 配置线程池中的线程的名称前缀 // 配置线程池中的线程的名称前缀
...@@ -97,4 +95,24 @@ public class TaskPoolConfig { ...@@ -97,4 +95,24 @@ public class TaskPoolConfig {
return executor; return executor;
} }
@Bean
public ThreadPoolTaskExecutor cacheServiceExecutor() {
log.info("start cacheServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(2);
// 配置最大线程数
executor.setMaxPoolSize(2);
// 配置队列大小
executor.setQueueCapacity(10);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("cacheServiceExecutor-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
} }
...@@ -48,7 +48,7 @@ public class AppChannelController extends BaseController { ...@@ -48,7 +48,7 @@ public class AppChannelController extends BaseController {
@RequestParam(value = "startTime", required = false) Long startTime, @RequestParam(value = "startTime", required = false) Long startTime,
@RequestParam(value = "endTime", required = false) Long endTime, @RequestParam(value = "endTime", required = false) Long endTime,
@RequestParam(value = "pageSize", defaultValue = "50") int size) { @RequestParam(value = "pageSize", defaultValue = "50") int size) {
return ResponseResult.success(channelService.getActiveChannelList(contendId, platform, keyword, startTime, endTime, size)); return ResponseResult.success(channelService.getActiveChannelList(contendId, platform, keyword, startTime, endTime, size, true));
} }
@ApiOperation("渠道库-友好渠道榜") @ApiOperation("渠道库-友好渠道榜")
...@@ -69,7 +69,7 @@ public class AppChannelController extends BaseController { ...@@ -69,7 +69,7 @@ public class AppChannelController extends BaseController {
@RequestParam(value = "startTime", required = false) Long startTime, @RequestParam(value = "startTime", required = false) Long startTime,
@RequestParam(value = "endTime", required = false) Long endTime, @RequestParam(value = "endTime", required = false) Long endTime,
@RequestParam(value = "pageSize", defaultValue = "50") int size) { @RequestParam(value = "pageSize", defaultValue = "50") int size) {
return ResponseResult.success(channelService.getPositiveList(contendId, platform, keyword, sorter, startTime, endTime, size)); return ResponseResult.success(channelService.getPositiveList(contendId, platform, keyword, sorter, startTime, endTime, size, true));
} }
@ApiOperation("渠道库-敏感渠道榜") @ApiOperation("渠道库-敏感渠道榜")
...@@ -90,7 +90,7 @@ public class AppChannelController extends BaseController { ...@@ -90,7 +90,7 @@ public class AppChannelController extends BaseController {
@RequestParam(value = "startTime", required = false) Long startTime, @RequestParam(value = "startTime", required = false) Long startTime,
@RequestParam(value = "endTime", required = false) Long endTime, @RequestParam(value = "endTime", required = false) Long endTime,
@RequestParam(value = "pageSize", defaultValue = "50") int size) { @RequestParam(value = "pageSize", defaultValue = "50") int size) {
return ResponseResult.success(channelService.getNegativeList(contendId, platform, keyword, sorter, startTime, endTime, size)); return ResponseResult.success(channelService.getNegativeList(contendId, platform, keyword, sorter, startTime, endTime, size, true));
} }
@ApiOperation("渠道库-收藏渠道") @ApiOperation("渠道库-收藏渠道")
......
...@@ -168,7 +168,7 @@ public class AppSearchController extends BaseController { ...@@ -168,7 +168,7 @@ public class AppSearchController extends BaseController {
@GetMapping("channel/list/active") @GetMapping("channel/list/active")
public ResponseResult getActiveChannelList(@RequestParam(value = "size", defaultValue = "2") int size) { public ResponseResult getActiveChannelList(@RequestParam(value = "size", defaultValue = "2") int size) {
Long[] timeRangeDay = commonService.getTimeRangeDay(); Long[] timeRangeDay = commonService.getTimeRangeDay();
return ResponseResult.success(channelService.getActiveChannelList("0", null, null, timeRangeDay[0], timeRangeDay[1], size)); return ResponseResult.success(channelService.getActiveChannelList("0", null, null, timeRangeDay[0], timeRangeDay[1], size, true));
} }
@ApiOperation("搜索-渠道搜索条件") @ApiOperation("搜索-渠道搜索条件")
......
...@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; ...@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.dao.BaseMongoDao; import com.zhiwei.brandkbs2.dao.BaseMongoDao;
import com.zhiwei.brandkbs2.pojo.AbstractBaseMongo; import com.zhiwei.brandkbs2.pojo.AbstractBaseMongo;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.bson.BsonRegularExpression;
import org.bson.Document; import org.bson.Document;
import org.springframework.data.domain.Sort; import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.MongoTemplate;
...@@ -17,7 +18,6 @@ import java.lang.reflect.ParameterizedType; ...@@ -17,7 +18,6 @@ import java.lang.reflect.ParameterizedType;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Pattern;
/** /**
* @ClassName: BaseMongoDaoImpl * @ClassName: BaseMongoDaoImpl
...@@ -157,11 +157,11 @@ public class BaseMongoDaoImpl<T extends AbstractBaseMongo> implements BaseMongoD ...@@ -157,11 +157,11 @@ public class BaseMongoDaoImpl<T extends AbstractBaseMongo> implements BaseMongoD
if (StringUtils.isEmpty(keyword)) { if (StringUtils.isEmpty(keyword)) {
return; return;
} }
Pattern pattern = Pattern.compile("^.*" + keyword + ".*$", Pattern.CASE_INSENSITIVE); // Pattern pattern = Pattern.compile("^.*" + keyword + ".*$", Pattern.CASE_INSENSITIVE);
Criteria regex = Criteria.where(fuzzKeys[0]).regex(pattern); Criteria regex = Criteria.where(fuzzKeys[0]).regex(new BsonRegularExpression(keyword, "i"));
for (int i = 1; i < fuzzKeys.length; i++) { for (int i = 1; i < fuzzKeys.length; i++) {
// 多字段模糊查询 // 多字段模糊查询
regex.orOperator(Criteria.where(fuzzKeys[i]).regex(pattern)); regex.orOperator(Criteria.where(fuzzKeys[i]).regex(new BsonRegularExpression(keyword, "i")));
} }
query.addCriteria(regex); query.addCriteria(regex);
} }
......
...@@ -148,6 +148,18 @@ public class EsQueryTools { ...@@ -148,6 +148,18 @@ public class EsQueryTools {
return channelBoolQueryBuilder; return channelBoolQueryBuilder;
} }
public static BoolQueryBuilder assembleChannelSourceQuery(String sourceKeyword) {
BoolQueryBuilder channelBoolQueryBuilder = QueryBuilders.boolQuery();
String[] keys = sourceKeyword.trim().split("\\|");
for (String key : keys) {
String channelRegex = getAllRegex(key);
BoolQueryBuilder keyQueryBuilder = QueryBuilders.boolQuery();
keyQueryBuilder.must(QueryBuilders.regexpQuery("source.keyword", ".*" + channelRegex + ".*"));
channelBoolQueryBuilder.should(keyQueryBuilder);
}
return channelBoolQueryBuilder;
}
public static void platformMustNot(BoolQueryBuilder boolQueryBuilder, String... platformNames) { public static void platformMustNot(BoolQueryBuilder boolQueryBuilder, String... platformNames) {
if (null == platformNames) { if (null == platformNames) {
return; return;
......
...@@ -122,7 +122,7 @@ public interface ChannelService { ...@@ -122,7 +122,7 @@ public interface ChannelService {
* *
* @return ChannelListVO * @return ChannelListVO
*/ */
List<ChannelListVO> getActiveChannelList(String contendId, String platform, String keyword, Long startTime, Long endTime, int size); List<ChannelListVO> getActiveChannelList(String contendId, String platform, String keyword, Long startTime, Long endTime, int size,boolean cache);
/** /**
* 获取友好渠道榜 * 获取友好渠道榜
...@@ -134,7 +134,7 @@ public interface ChannelService { ...@@ -134,7 +134,7 @@ public interface ChannelService {
* @param size * @param size
* @return * @return
*/ */
List<ChannelListVO> getPositiveList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size); List<ChannelListVO> getPositiveList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size,boolean cache);
/** /**
* 获取敏感渠道榜 * 获取敏感渠道榜
...@@ -146,7 +146,7 @@ public interface ChannelService { ...@@ -146,7 +146,7 @@ public interface ChannelService {
* @param size * @param size
* @return * @return
*/ */
List<ChannelListVO> getNegativeList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size); List<ChannelListVO> getNegativeList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size,boolean cache);
/** /**
* 收藏渠道 * 收藏渠道
......
...@@ -14,6 +14,16 @@ public interface TaskService{ ...@@ -14,6 +14,16 @@ public interface TaskService{
void messageFlowCount(int day); void messageFlowCount(int day);
/** /**
* 渠道榜单缓存(必须在统计之后进行)
*/
void channelRecordListCache();
/**
* 消息流缓存
*/
void messageFlowCache();
/**
* 生成简报任务并推送 * 生成简报任务并推送
*/ */
void generateReportAndSend(); void generateReportAndSend();
......
package com.zhiwei.brandkbs2.service.impl; package com.zhiwei.brandkbs2.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
...@@ -29,6 +30,7 @@ import com.zhiwei.brandkbs2.service.CommonService; ...@@ -29,6 +30,7 @@ import com.zhiwei.brandkbs2.service.CommonService;
import com.zhiwei.brandkbs2.service.MarkDataService; import com.zhiwei.brandkbs2.service.MarkDataService;
import com.zhiwei.brandkbs2.service.ProjectService; import com.zhiwei.brandkbs2.service.ProjectService;
import com.zhiwei.brandkbs2.util.MongoUtil; import com.zhiwei.brandkbs2.util.MongoUtil;
import com.zhiwei.brandkbs2.util.RedisUtil;
import com.zhiwei.brandkbs2.util.Tools; import com.zhiwei.brandkbs2.util.Tools;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -106,6 +108,9 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -106,6 +108,9 @@ public class ChannelServiceImpl implements ChannelService {
@Resource(name = "mongoUtil") @Resource(name = "mongoUtil")
MongoUtil mongoUtil; MongoUtil mongoUtil;
@Resource(name = "redisUtil")
RedisUtil redisUtil;
@Resource(name = "esSearchExecutor") @Resource(name = "esSearchExecutor")
ThreadPoolTaskExecutor esSearchExecutor; ThreadPoolTaskExecutor esSearchExecutor;
...@@ -296,8 +301,8 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -296,8 +301,8 @@ public class ChannelServiceImpl implements ChannelService {
} }
@Override @Override
public List<ChannelListVO> getActiveChannelList(String contendId, String platform, String keyword, Long startTime, Long endTime, int size) { public List<ChannelListVO> getActiveChannelList(String contendId, String platform, String keyword, Long startTime, Long endTime, int size, boolean cache) {
return getEmotionList(contendId, platform, keyword, null, startTime, endTime, size, EmotionEnum.ALL.getState()); return getEmotionList(contendId, platform, keyword, null, startTime, endTime, size, EmotionEnum.ALL.getState(), cache);
} }
@Deprecated @Deprecated
...@@ -347,20 +352,28 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -347,20 +352,28 @@ public class ChannelServiceImpl implements ChannelService {
} }
@Override @Override
public List<ChannelListVO> getPositiveList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size) { public List<ChannelListVO> getPositiveList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size, boolean cache) {
return getEmotionList(contendId, platform, keyword, sorter, startTime, endTime, size, EmotionEnum.POSITIVE.getState()); return getEmotionList(contendId, platform, keyword, sorter, startTime, endTime, size, EmotionEnum.POSITIVE.getState(), cache);
} }
@Override @Override
public List<ChannelListVO> getNegativeList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size) { public List<ChannelListVO> getNegativeList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size,
return getEmotionList(contendId, platform, keyword, sorter, startTime, endTime, size, EmotionEnum.NEGATIVE.getState()); boolean cache) {
return getEmotionList(contendId, platform, keyword, sorter, startTime, endTime, size, EmotionEnum.NEGATIVE.getState(), cache);
} }
private List<ChannelListVO> getEmotionList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size, int emotion) { private List<ChannelListVO> getEmotionList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size,
int emotion, boolean cache) {
List<ChannelListVO> resList = new ArrayList<>(); List<ChannelListVO> resList = new ArrayList<>();
try { try {
Map<String, Pair<Long, ChannelRecord>> keyMap = new HashMap<>();
String projectId = UserThreadLocal.getProjectId(); String projectId = UserThreadLocal.getProjectId();
String redisKey = RedisUtil.getChannelRecordList(projectId, startTime, endTime);
String resultStr;
// 返回缓存
if (cache && StringUtils.isNotEmpty(resultStr = redisUtil.get(redisKey))) {
return JSON.parseArray(resultStr, ChannelListVO.class);
}
Map<String, Pair<Long, ChannelRecord>> keyMap = new HashMap<>();
EsClientDao.SearchHelper searchHelper = createSearchHelperByChannelCriteria(projectId, null, Collections.singleton(contendId), platform, keyword, startTime, endTime); EsClientDao.SearchHelper searchHelper = createSearchHelperByChannelCriteria(projectId, null, Collections.singleton(contendId), platform, keyword, startTime, endTime);
// 分页查询所有结果 // 分页查询所有结果
List<SearchResponse> searchResponses = channelEsDao.searchScrollResponse(searchHelper); List<SearchResponse> searchResponses = channelEsDao.searchScrollResponse(searchHelper);
...@@ -431,6 +444,8 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -431,6 +444,8 @@ public class ChannelServiceImpl implements ChannelService {
resList.add(ChannelListVO.createFromChannel(record, record.getRecord().getArticles().size())); resList.add(ChannelListVO.createFromChannel(record, record.getRecord().getArticles().size()));
} }
}); });
// TODO 配合天级缓存开启
// redisUtil.set(redisKey, JSON.toJSONString(resList));
} catch (IOException e) { } catch (IOException e) {
ExceptionCast.cast(CommonCodeEnum.FAIL, "es查询异常"); ExceptionCast.cast(CommonCodeEnum.FAIL, "es查询异常");
} }
...@@ -1402,7 +1417,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -1402,7 +1417,7 @@ public class ChannelServiceImpl implements ChannelService {
} }
// keyword // keyword
if (StringUtils.isNotEmpty(keyword)) { if (StringUtils.isNotEmpty(keyword)) {
query.must(EsQueryTools.assembleSourceQuery(keyword)); query.must(EsQueryTools.assembleChannelSourceQuery(keyword));
} }
// timeRange // timeRange
// 默认搜索一周 // 默认搜索一周
......
...@@ -1466,7 +1466,7 @@ public class MarkDataServiceImpl implements MarkDataService { ...@@ -1466,7 +1466,7 @@ public class MarkDataServiceImpl implements MarkDataService {
searchHelper.setSize(1); searchHelper.setSize(1);
SearchHits hits = esClientDao.searchResponse(searchHelper).getHits(); SearchHits hits = esClientDao.searchResponse(searchHelper).getHits();
if (0 == hits.getHits().length) { if (0 == hits.getHits().length) {
log.info("getYuqingMarkFirstTime-debug:{}", searchHelper.getQuery()); log.info("getYuqingMarkFirstTime-debug:{}", searchHelper.getPostFilter());
return 1609430400000L; return 1609430400000L;
} }
return Long.parseLong(esClientDao.searchResponse(searchHelper).getHits().getAt(0).getSourceAsMap().get("time") + ""); return Long.parseLong(esClientDao.searchResponse(searchHelper).getHits().getAt(0).getSourceAsMap().get("time") + "");
......
package com.zhiwei.brandkbs2.service.impl; package com.zhiwei.brandkbs2.service.impl;
import com.zhiwei.brandkbs2.auth.UserThreadLocal;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.config.Constant; import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.AggreeResultDao; import com.zhiwei.brandkbs2.dao.AggreeResultDao;
import com.zhiwei.brandkbs2.dao.BrandkbsTaskDao; import com.zhiwei.brandkbs2.dao.BrandkbsTaskDao;
...@@ -20,10 +22,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; ...@@ -20,10 +22,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.ArrayList; import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
...@@ -70,9 +69,18 @@ public class TaskServiceImpl implements TaskService { ...@@ -70,9 +69,18 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "reportServiceImpl") @Resource(name = "reportServiceImpl")
ReportService reportService; ReportService reportService;
@Resource(name = "indexServiceImpl")
IndexService indexService;
@Resource(name = "commonServiceImpl")
CommonService commonService;
@Resource(name = "taskServiceExecutor") @Resource(name = "taskServiceExecutor")
ThreadPoolTaskExecutor taskServiceExecutor; ThreadPoolTaskExecutor taskServiceExecutor;
@Resource(name = "cacheServiceExecutor")
ThreadPoolTaskExecutor cacheServiceExecutor;
@Override @Override
public void messageFlowCount(int day) { public void messageFlowCount(int day) {
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day); List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day);
...@@ -110,6 +118,47 @@ public class TaskServiceImpl implements TaskService { ...@@ -110,6 +118,47 @@ public class TaskServiceImpl implements TaskService {
log.info("渠道统计-渠道记录-统计结束"); log.info("渠道统计-渠道记录-统计结束");
} }
@Override
public void channelRecordListCache() {
AtomicInteger total = new AtomicInteger();
List<Long[]> timeList = Arrays.asList(commonService.getTimeRangeDay(), commonService.getTimeRangeWeek(), commonService.getTimeRangeMonth());
String sorter = "{\"index\":\"descend\"}";
CompletableFuture.allOf(GlobalPojo.PROJECT_MAP.values().stream().map(project -> CompletableFuture.supplyAsync(() -> {
UserThreadLocal.set(new UserInfo().setProjectId(project.getId()));
timeList.forEach(times -> {
// 活跃渠道榜
channelService.getActiveChannelList(Constant.PRIMARY_CONTEND_ID, null, null, times[0], times[1], 50, false);
// 友好渠道榜
channelService.getPositiveList(Constant.PRIMARY_CONTEND_ID, null, null, sorter, times[0], times[1], 50, false);
// 敏感渠道榜
channelService.getNegativeList(Constant.PRIMARY_CONTEND_ID, null, null, sorter, times[0], times[1], 50, false);
});
log.info("项目:{}-渠道榜单缓存已完成:{}个", project.getProjectName(), total.incrementAndGet());
return null;
}, cacheServiceExecutor)).toArray(CompletableFuture[]::new)).join();
}
@Override
public void messageFlowCache() {
AtomicInteger total = new AtomicInteger();
CompletableFuture.allOf(GlobalPojo.PROJECT_MAP.values().stream().map(project -> CompletableFuture.supplyAsync(() -> {
UserThreadLocal.set(new UserInfo().setProjectId(project.getId()));
// 首页-舆情总量
indexService.getYuqingAmount(null, null, false);
// 首页-美誉度
indexService.getReputation(null, null, false);
// 首页-事件数
indexService.getEventAmount(null, null, false);
// 首页-平台贡献信息
indexService.getPlatformInfo(null, null, false);
// 首页-传播趋势
indexService.getSpreadingTend(null, null, false);
log.info("项目:{}-首页缓存已完成:{}个", project.getProjectName(), total.incrementAndGet());
return null;
}, cacheServiceExecutor)).toArray(CompletableFuture[]::new)).join();
}
private List<Channel> batchHandle(List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList, Map<ChannelIndex, ChannelIndex.Record> newRecordMap) { private List<Channel> batchHandle(List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList, Map<ChannelIndex, ChannelIndex.Record> newRecordMap) {
List<Channel> insertList = Collections.synchronizedList(new ArrayList<>()); List<Channel> insertList = Collections.synchronizedList(new ArrayList<>());
List<String> fids = batchList.stream().map(channelIndexRecordEntry -> channelIndexRecordEntry.getKey().getFid()).collect(Collectors.toList()); List<String> fids = batchList.stream().map(channelIndexRecordEntry -> channelIndexRecordEntry.getKey().getFid()).collect(Collectors.toList());
......
...@@ -17,7 +17,7 @@ import javax.annotation.Resource; ...@@ -17,7 +17,7 @@ import javax.annotation.Resource;
* @date: 2022-06-08 17:39 * @date: 2022-06-08 17:39
*/ */
@Component @Component
@Profile({"dev", "pro"}) @Profile({"prod"})
public class ControlCenter { public class ControlCenter {
public static final Logger log = LogManager.getLogger(ControlCenter.class); public static final Logger log = LogManager.getLogger(ControlCenter.class);
...@@ -30,6 +30,7 @@ public class ControlCenter { ...@@ -30,6 +30,7 @@ public class ControlCenter {
log.info("定时按天录入渠道进量-启动"); log.info("定时按天录入渠道进量-启动");
try { try {
taskService.messageFlowCount(1); taskService.messageFlowCount(1);
// taskService.channelRecordListCache();
} catch (Exception e) { } catch (Exception e) {
log.error("定时按天录入渠道进量-出错", e); log.error("定时按天录入渠道进量-出错", e);
} finally { } finally {
...@@ -38,6 +39,19 @@ public class ControlCenter { ...@@ -38,6 +39,19 @@ public class ControlCenter {
} }
@Async("scheduledExecutor") @Async("scheduledExecutor")
@Scheduled(cron = "0 0 1 * * ?")
public void messageFlowCache() {
log.info("定时按天缓存消息流信息-启动");
try {
taskService.messageFlowCache();
} catch (Exception e) {
log.error("定时按天缓存消息流信息-出错", e);
} finally {
log.info("定时按天缓存消息流信息-结束");
}
}
@Async("scheduledExecutor")
@Scheduled(cron = "0 0 * * * ? ") @Scheduled(cron = "0 0 * * * ? ")
public void reportScanning() { public void reportScanning() {
log.info("每小时扫描简报信息-启动"); log.info("每小时扫描简报信息-启动");
......
...@@ -50,6 +50,10 @@ public class RedisUtil { ...@@ -50,6 +50,10 @@ public class RedisUtil {
public static String getIndexSpread(String projectId, Long startTime, Long endTime) { public static String getIndexSpread(String projectId, Long startTime, Long endTime) {
return RedisKeyPrefix.INDEX_SPREAD + Tools.concat(projectId, startTime, endTime); return RedisKeyPrefix.INDEX_SPREAD + Tools.concat(projectId, startTime, endTime);
} }
public static String getChannelRecordList(String projectId, Long startTime, Long endTime) {
return RedisKeyPrefix.CHANNEL_RECORD_LIST + Tools.concat(projectId, startTime, endTime);
}
public void setExpire(String key, String value, long timeout, TimeUnit unit) { public void setExpire(String key, String value, long timeout, TimeUnit unit) {
stringRedisTemplate.opsForValue().set(key, value, timeout, unit); stringRedisTemplate.opsForValue().set(key, value, timeout, unit);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment