Commit 94757bb7 by 陈健智

Merge remote-tracking branch 'origin/feature' into feature

parents 2ade957c 0b37cddc
package com.zhiwei.brandkbs2.common;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.zhiwei.brandkbs2.pojo.ChannelTag;
import com.zhiwei.brandkbs2.pojo.Project;
import com.zhiwei.brandkbs2.service.EventService;
import com.zhiwei.brandkbs2.service.SystemInfoService;
import com.zhiwei.qbjc.bean.pojo.common.MessagePlatform;
import com.zhiwei.qbjc.bean.pojo.common.Tag;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
......@@ -17,6 +20,8 @@ import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
/**
......
......@@ -46,6 +46,7 @@ public class Constant {
*/
public static final String TASK_REPORT_JOB = "定时生成项目简报数据";
public static final int INIT_VERSION = 0;
public static final String DEFAULT_PROJECT_URL = "https://brandkbs.zhiweidata.com/images/event.jpg";
public static Long add8Hours(String time, FastDateFormat dateFormat) {
try {
......
......@@ -94,7 +94,7 @@ public class LoginController extends BaseController {
@ApiOperation("测试接口")
@GetMapping("/test")
public ResponseResult test() {
return ResponseResult.success("brandkbs2-success 2022/7/8");
return ResponseResult.success("brandkbs2-success 2022/12/5");
}
}
......
......@@ -67,13 +67,13 @@ public class ChannelController extends BaseController {
@GetMapping("/list")
public ResponseResult findChannelList(@RequestParam(value = "page", defaultValue = "1") int page,
@RequestParam(value = "pageSize", defaultValue = "10") int size,
@RequestParam(value = "contendId",defaultValue = "0") String contendId,
@RequestParam(value = "emotion", defaultValue = "") String emotion,
@RequestParam(value = "platform", defaultValue = "") String platform,
@RequestParam(value = "contendId", defaultValue = "0") String contendId,
@RequestParam(value = "emotion", required = false) String emotion,
@RequestParam(value = "platform", required = false) String platform,
@RequestParam(value = "show", required = false) Boolean show,
@RequestParam(value = "keyword", defaultValue = "") String keyword,
@RequestParam(value = "sorter", defaultValue = "{\"lastTime\":\"descend\"}") String sorter) {
PageVO<JSONObject> channelList = channelService.findChannelList(page, size, contendId, emotion, platform, show, keyword, sorter);
@RequestParam(value = "keyword", required = false) String keyword,
@RequestParam(value = "sorter", defaultValue = "{\"last_time\":\"descend\"}") String sorter) {
PageVO<JSONObject> channelList = channelService.findChannelListNew(page, size, contendId, emotion, platform, show, keyword, sorter);
return ResponseResult.success(channelList);
}
......
......@@ -181,8 +181,8 @@ public class AppSearchController extends BaseController {
@ApiOperation("搜索-渠道列表")
@PostMapping(value = "/channel/channelList")
public ResponseResult getChannelList(@RequestBody ChannelSearchDTO channelSearchDTO) {
return ResponseResult.success(channelService.getChannelList(channelSearchDTO.getPage(), channelSearchDTO.getPageSize(), channelSearchDTO.getKeyword(),
channelSearchDTO.getPlatform(), channelSearchDTO.getEmotions(), channelSearchDTO.getMediaTypes(), channelSearchDTO.getArticlesCount(), channelSearchDTO.getSorter()));
return ResponseResult.success(channelService.getChannelListNew(channelSearchDTO.getPage(), channelSearchDTO.getPageSize(),
channelSearchDTO.getKeyword(), channelSearchDTO.getPlatform(), channelSearchDTO.getEmotions(), channelSearchDTO.getMediaTypes(), channelSearchDTO.getArticlesCount(), channelSearchDTO.getSorter()));
}
@ApiOperation("搜索-热门事件")
......
......@@ -6,6 +6,7 @@ import com.zhiwei.brandkbs2.pojo.EventData;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
/**
......@@ -70,4 +71,13 @@ public interface EventDataDao extends BaseMongoDao<EventData>, ShardingMongo {
* @return list
*/
long findEventDataCount(String eventId, String collectionName);
/**
* 查询渠道符合时间段内的所有事件id
* @param channelFid
* @param startTime
* @param endTime
* @return
*/
Set<String> findEventIdsByChannelFid(String channelFid, long startTime, long endTime);
}
......@@ -3,6 +3,7 @@ package com.zhiwei.brandkbs2.dao.impl;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.EventDao;
import com.zhiwei.brandkbs2.dao.EventDataDao;
import com.zhiwei.brandkbs2.enmus.EmotionEnum;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.pojo.Event;
......@@ -16,6 +17,7 @@ import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
......@@ -32,6 +34,9 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
private static final String COLLECTION_NAME = "brandkbs_event";
@Resource(name = "eventDataDao")
EventDataDao eventDataDao;
public EventDaoImpl() {
super(COLLECTION_NAME);
}
......@@ -64,12 +69,14 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
@Override
public Map<Long, List<Event>> getEventDay(ChannelIndex channelIndex, Long startTime, Long endTime) {
return getEventTimePattern(channelIndex, startTime, endTime, 8 + 2);
// return getEventTimePattern(channelIndex, startTime, endTime, 8 + 2);
return getEventTimePatternNew(channelIndex, startTime, endTime, false);
}
@Override
public Map<Long, List<Event>> getEventMonth(ChannelIndex channelIndex, Long startTime, Long endTime) {
return getEventTimePattern(channelIndex, startTime, endTime, 6 + 1);
// return getEventTimePattern(channelIndex, startTime, endTime, 6 + 1);
return getEventTimePatternNew(channelIndex, startTime, endTime, true);
}
@Override
......@@ -131,6 +138,7 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
private Criteria eventCountCriteria(Long startTime, Long endTime, String emotion, String projectId, String contendId) {
Criteria criteria = Criteria.where("projectId").is(projectId);
criteria.and("contendId").is(contendId);
if (null != startTime || null != endTime) {
Criteria startTimeCriteria = criteria.and("startTime");
if (null != startTime) {
startTimeCriteria.gte(startTime);
......@@ -138,6 +146,7 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
if (null != endTime) {
startTimeCriteria.lt(endTime);
}
}
if (StringUtils.isNotEmpty(emotion) && !EmotionEnum.ALL.getName().equals(emotion)) {
criteria.and("emotion").is(emotion);
}
......@@ -214,6 +223,7 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
return Aggregation.newAggregation(operations);
}
@Deprecated
private Map<Long, List<Event>> getEventTimePattern(ChannelIndex channelIndex, Long startTime, Long endTime, int nrOfChars) {
Map<Long, List<Event>> res = new HashMap<>();
// 事件筛选条件
......@@ -247,6 +257,28 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
return res;
}
private Map<Long, List<Event>> getEventTimePatternNew(ChannelIndex channelIndex, Long startTime, Long endTime, boolean month) {
Map<Long, List<Event>> res = new HashMap<>();
String pattern;
if (month) {
pattern = Constant.MONTH_PATTERN;
} else {
pattern = Constant.DAY_PATTERN;
}
for (String eventId : eventDataDao.findEventIdsByChannelFid(channelIndex.getFid(), startTime, endTime)) {
Event event = findOneById(eventId);
Long mapKey = Tools.truncDate(event.getStartTime(), pattern);
res.putIfAbsent(mapKey,new ArrayList<>());
res.get(mapKey).add(event);
}
// 时间降序
for (Map.Entry<Long, List<Event>> entry : res.entrySet()) {
List<Event> newList = entry.getValue().stream().sorted((x, y) -> Long.compare(y.getStartTime(), x.getStartTime())).collect(Collectors.toList());
res.put(entry.getKey(), newList);
}
return res;
}
private List<String> getAggreeCollections(int years) {
List<String> res = new ArrayList<>();
Calendar date = Calendar.getInstance();
......
......@@ -4,14 +4,18 @@ import com.zhiwei.brandkbs2.dao.EventDataDao;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.pojo.Event;
import com.zhiwei.brandkbs2.pojo.EventData;
import org.bson.types.ObjectId;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
* @ClassName: EventDataDaoImpl
......@@ -81,4 +85,15 @@ public class EventDataDaoImpl extends BaseMongoDaoImpl<EventData> implements Eve
return mongoTemplate.count(query, EventData.class, collectionName);
}
@Override
public Set<String> findEventIdsByChannelFid(String channelFid, long startTime, long endTime) {
Set<String> res = new HashSet<>();
Criteria criteria = Criteria.where("channelFid").is(channelFid).and("eventMapper.startTime").gte(startTime).lt(endTime);
for (String eventDataCollection : generateCollectionNames(new Date(startTime), new Date(endTime))) {
List<ObjectId> list = mongoTemplate.findDistinct(Query.query(criteria), "eventMapper._id", eventDataCollection, ObjectId.class);
res.addAll(list.stream().map(ObjectId::toString).collect(Collectors.toList()));
}
return res;
}
}
......@@ -32,11 +32,12 @@ public class ExportAppChannelArticleDTO {
@ExcelProperty("情感倾向")
private String emotion;
public static ExportAppChannelArticleDTO createFromArticle(ChannelIndex.Article article, String url, String title) {
public static ExportAppChannelArticleDTO createFromArticle(ChannelIndex.Article article, String title,String content, String url) {
ExportAppChannelArticleDTO dto = new ExportAppChannelArticleDTO();
dto.setTime(new Date(article.getTime()));
dto.setUrl(url);
dto.setTitle(title);
dto.setContent(content);
dto.setUrl(url);
dto.setEmotion(EmotionEnum.state2Name(article.getEmotion()));
return dto;
}
......
......@@ -55,13 +55,14 @@ public enum ChannelEmotion {
}
public static int getEmotionRank(Double emotionIndex) {
int rank;
int rank = 3;
if (null == emotionIndex) {
return rank;
}
if (emotionIndex >= 80) {
rank = 1;
} else if (emotionIndex >= 60) {
rank = 2;
} else {
rank = 3;
}
return rank;
}
......
......@@ -122,6 +122,12 @@ public enum EmotionEnum {
return value;
}
}
if ("友好".equals(name)) {
return EmotionEnum.POSITIVE;
}
if ("不友好".equals(name) || "敏感".equals(name)) {
return EmotionEnum.NEGATIVE;
}
return UNDEFINED;
}
......
......@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
......@@ -43,6 +44,8 @@ public class ChannelEsDao extends EsClientDao {
@Resource(name = "commonServiceImpl")
CommonService commonService;
public static final String CHANNEL_COPY_ES_INDEX_NAME = "brandkbs2_channel_copy";
public List<String> getArticleIds(String fid) {
Long[] timeRangeMonth = commonService.getTimeRangeMonth();
return getArticleIds(timeRangeMonth[0], timeRangeMonth[1], fid);
......@@ -98,6 +101,22 @@ public class ChannelEsDao extends EsClientDao {
}
}
public void batchInsert(List<Map<String, Object>> insertList) {
retryTemplate.execute(context -> {
try {
BulkRequest bulkRequest = new BulkRequest();
for (Map<String, Object> map : insertList) {
bulkRequest.add(new IndexRequest(CHANNEL_COPY_ES_INDEX_NAME).id(map.get("id") + "").source(map));
}
channelEsClient.bulk(bulkRequest, RequestOptions.DEFAULT);
log.info("批量插入es数据【{}】条", insertList.size());
} catch (Exception e) {
e.printStackTrace();
}
return null;
});
}
@Override
public String[] getIndexes() {
return new String[]{getChannelRecordIndex()};
......
......@@ -3,6 +3,7 @@ package com.zhiwei.brandkbs2.es;
import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.enmus.ImportantChannelEnum;
import com.zhiwei.pushlog.tools.Tools;
import com.zhiwei.qbjc.bean.pojo.common.MessagePlatform;
import org.apache.commons.collections4.CollectionUtils;
......@@ -100,14 +101,28 @@ public class EsQueryTools {
// 不组装mediaTypes的情况
nestedBoolQueryBuilder.must(cacheMapsNestedQuery(mustQuery));
} else {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
mediaTypes.forEach(e -> {
BoolQueryBuilder mediaTypeQueryBuilder = QueryBuilders.boolQuery();
mediaTypeQueryBuilder.must(QueryBuilders.termQuery("brandkbs_cache_maps.channel_type.keyword", e));
mediaTypeQueryBuilder.must(mustQuery);
boolQueryBuilder.should(cacheMapsNestedQuery(mediaTypeQueryBuilder));
});
nestedBoolQueryBuilder.must(boolQueryBuilder);
// BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// mediaTypes.forEach(e -> {
// BoolQueryBuilder mediaTypeQueryBuilder = QueryBuilders.boolQuery();
// mediaTypeQueryBuilder.must(QueryBuilders.termQuery("brandkbs_cache_maps.channel_type.keyword", e));
// mediaTypeQueryBuilder.must(mustQuery);
// boolQueryBuilder.should(cacheMapsNestedQuery(mediaTypeQueryBuilder));
// });
BoolQueryBuilder mediaTypesBoolQueryBuilder = QueryBuilders.boolQuery();
for (String mediaType : mediaTypes) {
BoolQueryBuilder mediaTypeBoolQueryBuilder = QueryBuilders.boolQuery();
//其他
if (ImportantChannelEnum.QITA.getState().equals(mediaType)) {
for (String tag : ImportantChannelEnum.getAllTagExceptSpec()) {
mediaTypeBoolQueryBuilder.mustNot(QueryBuilders.matchQuery("channel_tag", tag));
}
} else {
mediaTypeBoolQueryBuilder.must(QueryBuilders.matchQuery("channel_tag", mediaType));
}
mediaTypesBoolQueryBuilder.should(mediaTypeBoolQueryBuilder);
}
nestedBoolQueryBuilder.must(cacheMapsNestedQuery(mustQuery));
nestedBoolQueryBuilder.must(mediaTypesBoolQueryBuilder);
}
return nestedBoolQueryBuilder;
}
......@@ -157,30 +172,34 @@ public class EsQueryTools {
// return tagQuery;
// }
public static BoolQueryBuilder assembleSourceQuery(String sourceKeyword) {
public static BoolQueryBuilder assembleFiledKeywordQuery(String field, String keyword) {
BoolQueryBuilder channelBoolQueryBuilder = QueryBuilders.boolQuery();
String[] keys = sourceKeyword.trim().split("\\|");
String[] keys = keyword.trim().split("\\|");
for (String key : keys) {
String channelRegex = getAllRegex(key);
BoolQueryBuilder keyQueryBuilder = QueryBuilders.boolQuery();
keyQueryBuilder.must(QueryBuilders.regexpQuery("source", ".*" + channelRegex + ".*"));
keyQueryBuilder.must(QueryBuilders.regexpQuery(field, ".*" + channelRegex + ".*"));
channelBoolQueryBuilder.should(keyQueryBuilder);
}
return channelBoolQueryBuilder;
}
public static BoolQueryBuilder assembleChannelSourceQuery(String sourceKeyword) {
BoolQueryBuilder channelBoolQueryBuilder = QueryBuilders.boolQuery();
String[] keys = sourceKeyword.trim().split("\\|");
for (String key : keys) {
String channelRegex = getAllRegex(key);
BoolQueryBuilder keyQueryBuilder = QueryBuilders.boolQuery();
keyQueryBuilder.must(QueryBuilders.regexpQuery("source.keyword", ".*" + channelRegex + ".*"));
channelBoolQueryBuilder.should(keyQueryBuilder);
}
return channelBoolQueryBuilder;
public static BoolQueryBuilder assembleSourceQuery(String sourceKeyword) {
return assembleFiledKeywordQuery("source", sourceKeyword);
}
// public static BoolQueryBuilder assembleChannelSourceQuery(String sourceKeyword) {
// BoolQueryBuilder channelBoolQueryBuilder = QueryBuilders.boolQuery();
// String[] keys = sourceKeyword.trim().split("\\|");
// for (String key : keys) {
// String channelRegex = getAllRegex(key);
// BoolQueryBuilder keyQueryBuilder = QueryBuilders.boolQuery();
// keyQueryBuilder.must(QueryBuilders.regexpQuery("source.keyword", ".*" + channelRegex + ".*"));
// channelBoolQueryBuilder.should(keyQueryBuilder);
// }
// return channelBoolQueryBuilder;
// }
public static void platformMustNot(BoolQueryBuilder boolQueryBuilder, String... platformNames) {
if (null == platformNames) {
return;
......
package com.zhiwei.brandkbs2.pojo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.PropertyNamingStrategy;
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.zhiwei.brandkbs2.util.Tools;
import lombok.Getter;
import lombok.Setter;
import org.bson.types.ObjectId;
import java.util.Date;
import java.util.Map;
/**
* @ClassName: Channel
......@@ -23,11 +28,21 @@ public class Channel extends ChannelIndex {
private Long cTime;
/**
* 最近发文
* 最近发文时间
*/
private Long lastTime;
/**
* 最近发文
*/
private String lastArticle;
/**
* 媒体类型
*/
private String mediaType;
/**
* 图片头像地址
*/
private String avatarUrl;
......@@ -64,6 +79,12 @@ public class Channel extends ChannelIndex {
private Double influence;
protected static final SerializeConfig JSON_CONFIG = new SerializeConfig();
static {
JSON_CONFIG.propertyNamingStrategy = PropertyNamingStrategy.SnakeCase;
}
public void setLastTime(Long lastTime) {
if (null == this.lastTime || this.lastTime < lastTime) {
this.lastTime = lastTime;
......@@ -102,18 +123,23 @@ public class Channel extends ChannelIndex {
channel.setLastTime(record.getLastTime());
channel.setRecord(record);
// // TODO 调性随机分配
// double random = Math.random();
// if (random < 0.3) {
// channel.setEmotion(ChannelEmotion.POSITIVE.getState());
// } else if (random < 0.6) {
// channel.setEmotion(ChannelEmotion.NEGATIVE.getState());
// } else {
// channel.setEmotion(ChannelEmotion.NEUTRAL.getState());
// }
// // TODO 情感指数随机分配
// channel.setEmotionIndex(Math.random() * 100);
return channel;
}
public Map<String, Object> createChannelCopyMap() {
JSONObject json = JSON.parseObject(JSON.toJSONString(this, JSON_CONFIG));
json.remove("channel_index");
return json;
// esMap.put("_id",this.getId());
// esMap.put("id",this.getId());
// esMap.put("project_id",this.getProjectId());
// esMap.put("contend_id",this.getContendId());
// esMap.put("platform",this.getPlatform());
// esMap.put("real_source",this.getRealSource());
// esMap.put("source",this.getSource());
// esMap.put("fid",this.getFid());
// esMap.put("emotion",this.getEmotion());
// esMap.put("emotion_index",this.getEmotionIndex());
}
}
package com.zhiwei.brandkbs2.pojo;
import com.zhiwei.brandkbs2.pojo.vo.ProjectVO;
import com.zhiwei.middleware.event.pojo.dto.EventTagRelatedDTO;
import lombok.Getter;
import lombok.Setter;
import org.springframework.data.mongodb.core.mapping.Document;
......
......@@ -8,6 +8,8 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.ToString;
import java.util.Map;
@Data
@ToString
public class ChannelVO {
......@@ -61,8 +63,7 @@ public class ChannelVO {
channelVO.setAvatarUrl(channel.getAvatarUrl());
channelVO.setEventCount(channel.getEventCount());
channelVO.setArticleCount(channel.getArticleCount());
// TODO influence实现
channelVO.setInfluence(null);
channelVO.setInfluence(channel.getInfluence());
String mediaType = GlobalPojo.getMediaType(projectId, channel.getPlatform(), channel.getSource());
if (null != mediaType) {
channelVO.setMediaType(mediaType.replaceAll(",", " "));
......@@ -71,4 +72,22 @@ public class ChannelVO {
return channelVO;
}
public static ChannelVO createFromChannelCopyMap(Map<String,Object> map){
JSONObject json = new JSONObject(map);
ChannelVO channelVO = new ChannelVO();
channelVO.setId(json.getString("id"));
channelVO.setSource(json.getString("source"));
channelVO.setRealSource(json.getString("real_source"));
channelVO.setEmotion(ChannelEmotion.getNameFromState(json.getInteger("emotion")));
channelVO.setEmotionRank(ChannelEmotion.getEmotionRank(json.getDouble("emotion_index")));
channelVO.setAvatarUrl(json.getString("avatar_url"));
channelVO.setEventCount(json.getLong("event_count"));
channelVO.setArticleCount(json.getLong("article_count"));
channelVO.setInfluence(json.getDouble("influence"));
channelVO.setMediaType(json.getString("media_type"));
channelVO.setLastArticle(json.getJSONObject("last_article"));
return channelVO;
}
}
package com.zhiwei.brandkbs2.pojo.vo;
import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.pojo.Contend;
import com.zhiwei.brandkbs2.pojo.Project;
import com.zhiwei.middleware.event.pojo.dto.EventTagRelatedDTO;
import com.zhiwei.middleware.event.pojo.vo.EventTagBrandkbsBindingVO;
import com.zhiwei.middleware.mark.vo.MarkerTag;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
......@@ -145,7 +145,11 @@ public class ProjectVO {
project.setHitKeywords(this.getHitKeywords());
project.setHighKeywords(this.getHighKeywords());
project.setMergeSensitive(this.isMergeSensitive());
if (null == this.getAvatarUrl()) {
project.setAvatarUrl(Constant.DEFAULT_PROJECT_URL);
} else {
project.setAvatarUrl(this.getAvatarUrl());
}
project.setWholeSearchDataSource(this.getWholeSearchDataSource());
project.setImportTime(this.getImportTime().getTime());
project.setHasContend(null != this.getContendList());
......
......@@ -37,6 +37,9 @@ public interface ChannelService {
*/
PageVO<JSONObject> findChannelList(int page, int size, String contendId, String emotion, String platform, Boolean show, String keyword, String sorter);
PageVO<JSONObject> findChannelListNew(int page, int size, String contendId, String emotion, String platform, Boolean show, String keyword,
String sorter);
/**
* 根据搜索条件查询稿件列表
*
......@@ -262,4 +265,7 @@ public interface ChannelService {
PageVO<ChannelVO> getChannelList(int page, int pageSize, String keyword, List<String> platforms, List<Integer> emotions, List<String> mediaTypes,
Integer[] articlesCount, String sorter);
PageVO<ChannelVO> getChannelListNew(int page, int pageSize, String keyword, List<String> platforms, List<Integer> emotions, List<String> mediaTypes,
Integer[] articlesCount, String sorter);
}
......@@ -25,4 +25,5 @@ public interface EventDataService {
*/
String deleteOneByIdAndEventId(String id, String eventId);
void updateEventInfo(Event event);
}
......@@ -291,4 +291,9 @@ public interface EventService {
*/
List<Event> findNotEndEventByProjectId(String projectId);
/**
* 更新所有事件快照属性
*/
void updateAllEventInfo();
}
......@@ -109,6 +109,17 @@ public class EventDataServiceImpl implements EventDataService {
log.info("analysisEvent-eventId:{},更新事件数据完毕,实际更新:{}条", event.getId(), eventDataList.size());
}
@Override
public void updateEventInfo(Event event) {
// 找到正确的事件数据
List<EventData> eventDataList = eventDataDao.findList(Query.query(Criteria.where("eventId").is(event.getId())), event.getCollectionName());
eventTopArticlesAnalysisDao.deleteByEventId(event.getId());
eventDisseminationTrendDao.deleteOneByQuery(Query.query(Criteria.where("eventId").is(event.getId())));
// 设置快照属性值
setEventStaticState(event, eventDataList);
log.info("updateEventInfo-eventId:{},更新事件数据完毕,实际更新:{}条", event.getId(), eventDataList.size());
}
private List<EventData> filterEventData(Event event, List<EventData> eventDataList) {
List<EventData> hitList = new ArrayList<>();
String keyword = event.getKeyword();
......
......@@ -36,6 +36,7 @@ import com.zhiwei.brandkbs2.service.ProjectService;
import com.zhiwei.brandkbs2.util.MongoUtil;
import com.zhiwei.brandkbs2.util.RedisUtil;
import com.zhiwei.brandkbs2.util.Tools;
import lombok.Data;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -44,6 +45,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
......@@ -59,6 +61,7 @@ import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
......@@ -683,6 +686,63 @@ public class EventServiceImpl implements EventService {
return eventDao.findList(query);
}
@Override
public void updateAllEventInfo() {
try {
Query query = new Query();
query.with(Sort.by(Sort.Order.asc("cTime")));
MongoQueryUtil mongoQueryUtil = new MongoQueryUtil(eventDao.count(query));
AtomicInteger total = new AtomicInteger();
AtomicInteger error = new AtomicInteger();
while (mongoQueryUtil.hasNextPage()) {
List<Event> list = eventDao.findList(mongoQueryUtil.getNextPageQuery());
log.info("updateAllEventInfo-发现事件共:{}个,current:{},total:{}", list.size(), mongoQueryUtil.getStartPage() * mongoQueryUtil.getLimit(),
mongoQueryUtil.getTotal());
list.forEach(event -> {
try {
eventDataService.updateEventInfo(event);
log.info("updateAllEventInfo-已更新事件:{}个,错误:{}个", total.incrementAndGet(), error.get());
} catch (Exception e) {
error.incrementAndGet();
log.error("updateAllEventInfo-eventId:{}", event.getId(), e);
}
});
}
} catch (Exception e) {
log.error("updateAllEventInfo-", e);
}
}
@Data
public static class MongoQueryUtil {
private int limit = 1000;
private long startPage = 1;
private long total;
private AtomicLong removeSize = new AtomicLong();
public MongoQueryUtil(long total) {
this.total = total;
}
public boolean hasNextPage() {
long totalPage = (total + limit - 1) / limit;
return startPage <= totalPage;
}
public Query getNextPageQuery() {
Query query = new Query();
if (!hasNextPage()) {
throw new IllegalStateException("startPage>=totalPage");
}
query.skip((startPage - 1) * limit - removeSize.get());
query.limit(limit);
query.with(Sort.by(Sort.Order.desc("_id")));
startPage++;
return query;
}
}
/**
* 获取时间筛选条件
*
......
......@@ -1532,7 +1532,8 @@ public class MarkDataServiceImpl implements MarkDataService {
return null;
}
return getTitleAndUrl(searchHits.getAt(0).getSourceAsMap());
} catch (IOException e) {
} catch (Exception e) {
log.info("getLastMarkData-",e);
return null;
}
}
......
......@@ -3,7 +3,10 @@ package com.zhiwei.brandkbs2.service.impl;
import com.zhiwei.brandkbs2.auth.UserThreadLocal;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.*;
import com.zhiwei.brandkbs2.dao.AggreeResultDao;
import com.zhiwei.brandkbs2.dao.BrandkbsTaskDao;
import com.zhiwei.brandkbs2.dao.ChannelDao;
import com.zhiwei.brandkbs2.dao.ReportSettingsDao;
import com.zhiwei.brandkbs2.enmus.ReportTypeEnum;
import com.zhiwei.brandkbs2.es.ChannelEsDao;
import com.zhiwei.brandkbs2.es.EsClientDao;
......@@ -91,6 +94,7 @@ public class TaskServiceImpl implements TaskService {
log.info("渠道统计-搜索到近{}天的受影响渠道数{}条", day, channelIndexRecordMap.size());
long handleSize = 0;
List<Channel> insertList = new ArrayList<>();
List<Channel> updateList = new ArrayList<>();
List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList = new ArrayList<>();
// 新recordMap
Map<ChannelIndex, ChannelIndex.Record> newRecordMap = new ConcurrentHashMap<>();
......@@ -98,7 +102,9 @@ public class TaskServiceImpl implements TaskService {
batchList.add(entry);
// 每100条做一次清算
if (++handleSize % 100 == 0 || handleSize == channelIndexRecordMap.size()) {
insertList.addAll(batchHandle(batchList, newRecordMap));
Pair<List<Channel>, List<Channel>> listPair = batchHandle(batchList, newRecordMap);
insertList.addAll(listPair.getLeft());
updateList.addAll(listPair.getRight());
batchList = new ArrayList<>();
}
if (handleSize % 10000 == 0) {
......@@ -115,6 +121,13 @@ public class TaskServiceImpl implements TaskService {
Long[] timeMinMax = Tools.timeMinMax(rangeTimeRecords.stream().map(Pair::getLeft).collect(Collectors.toList()));
List<ChannelRecord> channelRecords = ChannelRecord.createChannelRecords(timeMinMax[0], timeMinMax[1], channelIndexRecordMap);
channelEsDao.upsertChannelRecord(channelRecords);
// 同步channelCopy
ListUtils.partition(insertList, 1000).forEach(list -> {
channelEsDao.batchInsert(list.stream().map(Channel::createChannelCopyMap).collect(Collectors.toList()));
});
ListUtils.partition(updateList, 1000).forEach(list -> {
channelEsDao.batchInsert(list.stream().map(Channel::createChannelCopyMap).collect(Collectors.toList()));
});
log.info("渠道统计-渠道记录-统计结束");
}
......@@ -159,8 +172,10 @@ public class TaskServiceImpl implements TaskService {
}, cacheServiceExecutor)).toArray(CompletableFuture[]::new)).join();
}
private List<Channel> batchHandle(List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList, Map<ChannelIndex, ChannelIndex.Record> newRecordMap) {
private Pair<List<Channel>, List<Channel>> batchHandle(List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList, Map<ChannelIndex,
ChannelIndex.Record> newRecordMap) {
List<Channel> insertList = Collections.synchronizedList(new ArrayList<>());
List<Channel> updateList = Collections.synchronizedList(new ArrayList<>());
List<String> fids = batchList.stream().map(channelIndexRecordEntry -> channelIndexRecordEntry.getKey().getFid()).collect(Collectors.toList());
Map<String, Channel> fidChannel = channelDao.queryUniqueAsync(fids);
CompletableFuture.allOf(batchList.stream().map(entry -> CompletableFuture.supplyAsync(() -> {
......@@ -172,27 +187,14 @@ public class TaskServiceImpl implements TaskService {
} else {
channel.setRecord(entry.getValue());
channelDao.updateOne(channelService.calculateChannelEmotionIndex(channel));
updateList.add(channel);
}
// 设置查询数值
entry.getKey().setChannelInfo(channel);
newRecordMap.put(entry.getKey(), entry.getValue());
return null;
}, taskServiceExecutor)).toArray(CompletableFuture[]::new)).join();
// for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : batchList) {
// String fid = entry.getKey().getFid();
// Channel channel = fidChannel.get(fid);
// if (null == channel) {
// channel = Channel.createFromChannelIndexRecord(entry.getKey(), entry.getValue());
// insertList.add(channelService.calculateChannelEmotionIndex(channel));
// } else {
// channel.setRecord(entry.getValue());
// channelDao.updateOne(channelService.calculateChannelEmotionIndex(channel));
// }
// // 设置查询数值
// entry.getKey().setChannelInfo(channel);
// newRecordMap.put(entry.getKey(), entry.getValue());
// }
return insertList;
return Pair.of(insertList, updateList);
}
@Deprecated
......
......@@ -242,7 +242,12 @@ public class UserServiceImpl implements UserService {
Optional.ofNullable(projectDao.findOne("projectName", userProject.getProjectName())).ifPresent(project -> {
// 排除已经有权限的Project
if (userRoles.stream().noneMatch(userRole -> userRole.getProjectId().equals(project.getId()))) {
userRoles.add(new UserRole(project.getId(), userProject.getRoleId(), userProject.getExpiredTime(), userProject.getExportAmount()));
Long expiredTime = null;
// 只有客户才有过期时间
if (RoleEnum.CUSTOMER.getState() == userProject.getRoleId()) {
expiredTime = userProject.getExpiredTime();
}
userRoles.add(new UserRole(project.getId(), userProject.getRoleId(), expiredTime, userProject.getExportAmount()));
hit.set(true);
}
});
......
......@@ -32,8 +32,10 @@ secondary.uri=mongodb://qbjcuser:qbjc1q2w3e4r@192.168.0.150:27017,192.168.0.151:
es.clusterNodes=192.168.0.130:9200
es.clusterName=zhiweidata-new-es
es.httpClusterNodes=192.168.0.130:9200:qbjc-back:yuqing.zhiweidata.com,192.168.0.51:9400:elastic:qWxZRW42OHkuOhmF5AXX
es.username=brandkbs2_test
es.password=3vh65l$i6qQA
#es.username=brandkbs2_test
#es.password=3vh65l$i6qQA
es.username=joker
es.password=jokerdevops
es.index.test = true
#channel-index
......
......@@ -34,8 +34,10 @@ secondary.uri=mongodb://qbjcuser:qbjc1q2w3e4r@202.107.192.94:17152/qbjc?authSour
es.clusterNodes=202.107.192.94:1443
es.clusterName=zhiweidata-new-es
es.httpClusterNodes=202.107.192.94:1443:brandkbs2:3vh65l$i6qQA
es.username=brandkbs2_test
es.password=3vh65l$i6qQA
#es.username=brandkbs2_test jokerdevops
#es.password=3vh65l$i6qQA
es.username=joker
es.password=jokerdevops
es.index.test = true
#channel-index
......
......@@ -32,8 +32,10 @@ secondary.uri=mongodb://qbjcuser:qbjc1q2w3e4r@192.168.0.150:27017,192.168.0.151:
es.clusterNodes=192.168.0.130:9200
es.clusterName=zhiweidata-new-es
es.httpClusterNodes=192.168.0.130:9200:qbjc-back:yuqing.zhiweidata.com,192.168.0.51:9400:elastic:qWxZRW42OHkuOhmF5AXX
es.username=brandkbs2_test
es.password=3vh65l$i6qQA
#es.username=brandkbs2_test
#es.password=3vh65l$i6qQA
es.username=joker
es.password=jokerdevops
es.index.test = true
#channel-index
......
package com.zhiwei.brandkbs2;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.dao.ChannelDao;
import com.zhiwei.brandkbs2.es.ChannelEsDao;
import com.zhiwei.brandkbs2.pojo.Channel;
import com.zhiwei.brandkbs2.service.MarkDataService;
import com.zhiwei.brandkbs2.service.ProjectService;
import lombok.Data;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
* @ClassName: CopyChannelFromMongo2Es
* @Description 迁移mongo数据从mongo到es库
* @author: sjj
* @date: 2022-11-29 11:28
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class CopyChannelFromMongo2Es {
private static final Logger log = LogManager.getLogger(CopyChannelFromMongo2Es.class);
@Autowired
ProjectService projectService;
@Autowired
MarkDataService markDataService;
@Autowired
ChannelDao channelDao;
@Autowired
ChannelEsDao channelEsDao;
@Resource(name = "esSearchExecutor")
ThreadPoolTaskExecutor esSearchExecutor;
@Test
public void test() throws IOException {
// 2022.6.1
long startTime = 1654012800000L;
Query query = new Query(Criteria.where("lastTime").gt(startTime));
long count = channelDao.count(query);
MongoQueryUtil mongoQueryUtil = new MongoQueryUtil(count);
mongoQueryUtil.setStartPage(4939);
while (mongoQueryUtil.hasNextPage()) {
List<Channel> list = channelDao.findList(mongoQueryUtil.getNextPageQuery());
List<CompletableFuture<Map<String, Object>>> futureList = list.stream().map(channel -> CompletableFuture.supplyAsync(() -> {
JSONObject markData = markDataService.getLastMarkData(channel.getProjectId(), "-1", channel.getContendId(), channel.getPlatform(), channel.getRealSource(), channel.getSource());
if (null != markData) {
channel.setLastArticle(markData.toJSONString());
}
channel.setMediaType(GlobalPojo.getMediaType(channel.getProjectId(), channel.getPlatform(), channel.getSource()));
return channel.createChannelCopyMap();
}, esSearchExecutor)).collect(Collectors.toList());
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
List<Map<String, Object>> collect = futureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
// 查询最近发文并设置
channelEsDao.batchInsert(collect);
log.info("total:{},current:{}", mongoQueryUtil.getTotal(), mongoQueryUtil.getStartPage() * mongoQueryUtil.getLimit());
}
}
@Data
public static class MongoQueryUtil {
private int limit = 1000;
private long startPage = 1;
private long total;
private AtomicLong removeSize = new AtomicLong();
public MongoQueryUtil(long total) {
this.total = total;
}
public boolean hasNextPage() {
long totalPage = (total + limit - 1) / limit;
return startPage <= totalPage;
}
public Query getNextPageQuery() {
Query query = new Query();
if (!hasNextPage()) {
throw new IllegalStateException("startPage>=totalPage");
}
query.skip((startPage - 1) * limit - removeSize.get());
query.limit(limit);
query.with(Sort.by(Sort.Order.desc("_id")));
startPage++;
return query;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment