Commit 80d39e08 by shenjunjie

Merge branch 'feature' into 'dev'

调整入库统计

See merge request !21
parents b2fe20c8 790d1431
......@@ -79,4 +79,22 @@ public class TaskPoolConfig {
return executor;
}
@Bean
public ThreadPoolTaskExecutor taskServiceExecutor() {
log.info("start taskServiceExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(8);
// 配置最大线程数
executor.setMaxPoolSize(16);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("taskServiceExecutor-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
}
......@@ -197,8 +197,8 @@ public class AppChannelController extends BaseController {
@ApiImplicitParam(name = "endTime", value = "结束时间", required = true, paramType = "query", dataType = "long"),
@ApiImplicitParam(name = "page", value = "页码", defaultValue = "1", paramType = "query", dataType = "int"),
@ApiImplicitParam(name = "pageSize", value = "页码大小", defaultValue = "10", paramType = "query", dataType = "int"),
@ApiImplicitParam(name = "channelId", value = "渠道ID", required = true, paramType = "query", dataType = "int"),
@ApiImplicitParam(name = "contendId", value = "竞品ID", defaultValue = "0", paramType = "query", dataType = "int")
@ApiImplicitParam(name = "channelId", value = "渠道ID", required = true, paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "contendId", value = "竞品ID", defaultValue = "0", paramType = "query", dataType = "string")
})
@GetMapping("/events")
public ResponseResult getEvents(@RequestParam(value = "startTime") Long startTime,
......@@ -214,13 +214,15 @@ public class AppChannelController extends BaseController {
@ApiImplicitParams({
@ApiImplicitParam(name = "startTime", value = "开始时间", required = true, paramType = "query", dataType = "long"),
@ApiImplicitParam(name = "endTime", value = "结束时间", required = true, paramType = "query", dataType = "long"),
@ApiImplicitParam(name = "channelId", value = "渠道ID", required = true, paramType = "query", dataType = "string")
@ApiImplicitParam(name = "channelId", value = "渠道ID", required = true, paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "contendId", value = "竞品ID", defaultValue = "0", paramType = "query", dataType = "string")
})
@GetMapping("/events/download")
public ResponseResult downloadEvents(@RequestParam(value = "startTime") long startTime,
@RequestParam(value = "endTime") long endTime,
@RequestParam("channelId") String channelId) {
List<ExportAppChannelEventDTO> exportAppChannelEventDTOS = channelService.downloadEventsByTime(startTime, endTime, channelId);
@RequestParam("channelId") String channelId,
@RequestParam(value = "contendId", defaultValue = "0") String contendId) {
List<ExportAppChannelEventDTO> exportAppChannelEventDTOS = channelService.downloadEventsByTime(startTime, endTime, channelId, contendId);
EasyExcelUtil.download(channelId + "渠道列表数据", "sheet1", ExportAppChannelEventDTO.class, exportAppChannelEventDTOS, response);
return ResponseResult.success();
}
......
......@@ -68,23 +68,42 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
@Override
public long getEventCount(ChannelIndex channelIndex, List<String> eventEmotions, String emotion) {
Criteria criteria = Criteria.where("emotion").in(eventEmotions).
and("projectId").is(channelIndex.getProjectId()).and("contendId").is(channelIndex.getContendId());
String primaryCollection = getAggreeCollection();
Criteria criteria = Criteria.where("channelFid").is(channelIndex.getFid());
if (null != emotion) {
criteria.and("emotion").is(emotion);
}
String aliasName = "events";
Criteria lookUpCriteria = Criteria.where(aliasName + ".emotion").in(eventEmotions);
List<AggregationOperation> operations = Arrays.asList(Aggregation.match(criteria),
Aggregation.lookup(COLLECTION_NAME, "eventId", "_id", aliasName), Aggregation.match(lookUpCriteria),
Aggregation.project("events._id"));
Aggregation aggregation = Aggregation.newAggregation(operations);
AggregationResults<JSONObject> aggregate = mongoTemplate.aggregate(aggregation, primaryCollection, JSONObject.class);
List<JSONObject> mappedResults = aggregate.getMappedResults();
return mappedResults.stream().map(json -> json.getString("_id")).distinct().count();
}
/**
* 以事件为主表联合查询 应用于getEventCount,已被替代
* @param channelIndex
* @param eventEmotions
* @param emotion
* @return
*/
@Deprecated
private Aggregation eventCountAggregationPrimaryEvent(ChannelIndex channelIndex, List<String> eventEmotions, String emotion) {
Criteria criteria = Criteria.where("emotion").in(eventEmotions).and("projectId").is(channelIndex.getProjectId()).and("contendId").is(channelIndex.getContendId());
String aliasName = "articles";
Criteria lookUpCriteria = Criteria.where(aliasName.concat(".channelFid")).is(channelIndex.getFid());
if (null != emotion) {
lookUpCriteria.and(aliasName.concat(".emotion")).is(emotion);
}
// long count = mongoTemplate.count(Query.query(criteria), COLLECTION_NAME);
List<AggregationOperation> operations = Arrays.asList(
Aggregation.match(criteria),
List<AggregationOperation> operations = Arrays.asList(Aggregation.match(criteria),
// aoc -> new Document("$addFields", new Document("_id", new Document("$toString", "$_id"))), 该方式mongo版本不支持
Aggregation.lookup(getAggreeCollection(), "_id", "eventId", aliasName),
Aggregation.match(lookUpCriteria)
);
AggregationResults<JSONObject> aggregate = mongoTemplate.aggregate(Aggregation.newAggregation(operations), COLLECTION_NAME, JSONObject.class);
List<JSONObject> mappedResults = aggregate.getMappedResults();
return mappedResults.size();
Aggregation.lookup(getAggreeCollection(), "_id", "eventId", aliasName), Aggregation.match(lookUpCriteria));
return Aggregation.newAggregation(operations);
}
private Map<Long, List<Event>> getEventTimePattern(ChannelIndex channelIndex, Long startTime, Long endTime, int nrOfChars) {
......@@ -95,13 +114,10 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
String aliasName = "articles";
Criteria lookUpCriteria = Criteria.where(aliasName.concat(".channelFid")).is(channelIndex.getFid());
// 分组
Aggregation agg = Aggregation.newAggregation(Aggregation.match(criteria),
Aggregation.project("startTime", "_id", "emotion", "influence", "title", "endTime", "eventTag"),
Aggregation agg = Aggregation.newAggregation(Aggregation.match(criteria), Aggregation.project("startTime", "_id", "emotion", "influence", "title", "endTime", "eventTag"),
// 想通过截取的方式来分组,但是会有8小时误差问题未解决,故舍弃
// .andExpression("add(new java.util.Date(8),startTime)").substring(0, nrOfChars).as("patternDate"),
Aggregation.lookup(getAggreeCollection(), "_id", "eventId", aliasName),
Aggregation.match(lookUpCriteria)
);
Aggregation.lookup(getAggreeCollection(), "_id", "eventId", aliasName), Aggregation.match(lookUpCriteria));
AggregationResults<JSONObject> aggregate = mongoTemplate.aggregate(agg, COLLECTION_NAME, JSONObject.class);
List<JSONObject> mappedResults = aggregate.getMappedResults();
for (JSONObject mappedResult : mappedResults) {
......
......@@ -2,6 +2,7 @@ package com.zhiwei.brandkbs2.es;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.util.Tools;
import lombok.Getter;
......@@ -89,6 +90,8 @@ public class EsClientDao {
public List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> searchRecordRecentDay(int day) {
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> res = new ArrayList<>();
Calendar calendar = Calendar.getInstance();
// 设置为当天0:00
calendar.setTime(Tools.truncDate(calendar.getTime(), Constant.DAY_PATTERN));
long endTime = calendar.getTime().getTime();
calendar.add(Calendar.DAY_OF_MONTH, -day);
long startTime = calendar.getTime().getTime();
......
......@@ -62,6 +62,10 @@ public class ChannelIndex extends AbstractBaseMongo {
*/
private Double emotionIndex;
public ChannelIndex(Channel channel, String contendId) {
this(channel.getProjectId(), contendId, channel.getPlatform(), channel.getRealSource(), channel.getSource());
}
public ChannelIndex(Channel channel) {
this(channel.getProjectId(), channel.getContendId(), channel.getPlatform(), channel.getRealSource(), channel.getSource());
}
......
......@@ -243,7 +243,7 @@ public interface ChannelService {
* @param channelId 渠道ID
* @return 事件信息
*/
List<ExportAppChannelEventDTO> downloadEventsByTime(Long startTime, Long endTime, String channelId);
List<ExportAppChannelEventDTO> downloadEventsByTime(Long startTime, Long endTime, String channelId,String contendId);
/**
* 计算渠道倾向及指数
......
......@@ -74,12 +74,14 @@ public interface ProjectService {
/**
* 获取当前用户拥有的所有项目
*
* @return
*/
List<JSONObject> getUserAllProjects();
/**
* 获取当前用户拥有的所有项目(包括已过期)
*
* @return
*/
List<JSONObject> getLoginUserAllProjects();
......@@ -90,7 +92,7 @@ public interface ProjectService {
* @param hasPrimary 是否要主品牌
* @return 品牌筛选列表
*/
List<JSONObject> getBrands(String projectId,boolean hasPrimary);
List<JSONObject> getBrands(String projectId, boolean hasPrimary);
/**
* 根据关联项目组ID获取Project
......@@ -106,10 +108,11 @@ public interface ProjectService {
* @param contendId 品牌id
* @return project对象
*/
AbstractProject getProjectByContendId(String contendId);
AbstractProject getProjectByContendId(String projectId, String contendId);
/**
* 获取所有启动状态下的Project
*
* @return
*/
List<Project> getAllProjectsWithStart();
......
......@@ -227,8 +227,7 @@ public class ProjectServiceImpl implements ProjectService {
}
@Override
public AbstractProject getProjectByContendId(String contendId) {
String projectId = UserThreadLocal.getProjectId();
public AbstractProject getProjectByContendId(String projectId, String contendId) {
Project project = projectDao.findOneById(projectId);
if ("0".equals(contendId)) {
return project;
......
package com.zhiwei.brandkbs2.service.impl;
import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.*;
import com.zhiwei.brandkbs2.dao.AggreeResultDao;
import com.zhiwei.brandkbs2.dao.BrandkbsTaskDao;
import com.zhiwei.brandkbs2.dao.ChannelDao;
import com.zhiwei.brandkbs2.dao.ReportSettingsDao;
import com.zhiwei.brandkbs2.enmus.ReportTypeEnum;
import com.zhiwei.brandkbs2.es.ChannelEsDao;
import com.zhiwei.brandkbs2.es.EsClientDao;
......@@ -13,14 +16,16 @@ import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
......@@ -65,6 +70,9 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "reportServiceImpl")
ReportService reportService;
@Resource(name = "taskServiceExecutor")
ThreadPoolTaskExecutor taskServiceExecutor;
@Override
public void messageFlowCount(int day) {
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day);
......@@ -77,7 +85,7 @@ public class TaskServiceImpl implements TaskService {
List<Channel> insertList = new ArrayList<>();
List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList = new ArrayList<>();
// 新recordMap
Map<ChannelIndex, ChannelIndex.Record> newRecordMap = new HashMap<>();
Map<ChannelIndex, ChannelIndex.Record> newRecordMap = new ConcurrentHashMap<>();
for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : channelIndexRecordMap.entrySet()) {
batchList.add(entry);
// 每100条做一次清算
......@@ -85,7 +93,7 @@ public class TaskServiceImpl implements TaskService {
insertList.addAll(batchHandle(batchList, newRecordMap));
batchList = new ArrayList<>();
}
if (handleSize % 10000 == 0) {
if (handleSize % 100 == 0) {
log.info("渠道统计-渠道总计-查询更新已完成{}/{}", handleSize, channelIndexRecordMap.size());
}
}
......@@ -103,10 +111,10 @@ public class TaskServiceImpl implements TaskService {
}
private List<Channel> batchHandle(List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList, Map<ChannelIndex, ChannelIndex.Record> newRecordMap) {
List<Channel> insertList = new ArrayList<>();
List<Channel> insertList = Collections.synchronizedList(new ArrayList<>());
List<String> fids = batchList.stream().map(channelIndexRecordEntry -> channelIndexRecordEntry.getKey().getFid()).collect(Collectors.toList());
Map<String, Channel> fidChannel = channelDao.queryUniqueAsync(fids);
for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : batchList) {
CompletableFuture.allOf(batchList.stream().map(entry -> CompletableFuture.supplyAsync(() -> {
String fid = entry.getKey().getFid();
Channel channel = fidChannel.get(fid);
if (null == channel) {
......@@ -119,7 +127,22 @@ public class TaskServiceImpl implements TaskService {
// 设置查询数值
entry.getKey().setChannelInfo(channel);
newRecordMap.put(entry.getKey(), entry.getValue());
}
return null;
}, taskServiceExecutor)).toArray(CompletableFuture[]::new)).join();
// for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : batchList) {
// String fid = entry.getKey().getFid();
// Channel channel = fidChannel.get(fid);
// if (null == channel) {
// channel = Channel.createFromChannelIndexRecord(entry.getKey(), entry.getValue());
// insertList.add(channelService.calculateChannelEmotionIndex(channel));
// } else {
// channel.setRecord(entry.getValue());
// channelDao.updateOne(channelService.calculateChannelEmotionIndex(channel));
// }
// // 设置查询数值
// entry.getKey().setChannelInfo(channel);
// newRecordMap.put(entry.getKey(), entry.getValue());
// }
return insertList;
}
......
......@@ -3,9 +3,10 @@ package com.zhiwei.brandkbs2;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.controller.app.AppArticleController;
import com.zhiwei.brandkbs2.dao.ChannelLabelDao;
import com.zhiwei.brandkbs2.dao.EventDataDao;
import com.zhiwei.brandkbs2.dao.ReportDao;
import com.zhiwei.brandkbs2.dao.*;
import com.zhiwei.brandkbs2.pojo.Channel;
import com.zhiwei.brandkbs2.service.ChannelService;
import com.zhiwei.brandkbs2.service.EventService;
import com.zhiwei.brandkbs2.service.ReportService;
import com.zhiwei.brandkbs2.service.TaskService;
import com.zhiwei.brandkbs2.util.TextUtil;
......@@ -38,6 +39,12 @@ public class TestRunWith {
ReportService reportService;
@Autowired
EventService eventService;
@Autowired
EventDao eventDao;
@Autowired
EventDataDao eventDataDao;
@Autowired
......@@ -55,6 +62,12 @@ public class TestRunWith {
@Autowired
GlobalPojo globalPojo;
@Autowired
ChannelDao channelDao;
@Autowired
ChannelService channelService;
@Test
public void test1() {
// UserInfo userInfo = new UserInfo();
......@@ -62,8 +75,16 @@ public class TestRunWith {
// UserThreadLocal.set(userInfo);
// ResponseResult result = appArticleController.getMarkSpread(1657468800000L, 1657555200000L);
// System.out.println(JSONObject.toJSONString(result));
taskService.messageFlowCount(2);
// taskService.messageFlowCount(2);
// eventService.analysisEvents(Arrays.asList("62e9eb772a41a57ed6c6ef92"));
// ChannelIndex channelIndex = new ChannelIndex("62beadd1bbf8eb20f96d2f2f","0","网媒","网媒","中工网");
// long total = eventDao.judgeSpecEventCount(channelIndex, Collections.singletonList("正面"), "中性");
// eventDao.getEventDay(channelIndex,1659283200000L,1660924800000L);
long start = System.currentTimeMillis();
Channel channel = channelDao.findOneById("62f5482d3b00de5932ba9593");
channelService.calculateChannelEmotionIndex(channel);
System.out.println(System.currentTimeMillis()-start);
// reportService.getReportsAggCount();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment