Commit ebba83ab by shenjunjie

Merge branch 'feature' into 'release'

Feature

See merge request !114
parents 42fe2c60 374e17a8
...@@ -46,6 +46,7 @@ public class Constant { ...@@ -46,6 +46,7 @@ public class Constant {
*/ */
public static final String TASK_REPORT_JOB = "定时生成项目简报数据"; public static final String TASK_REPORT_JOB = "定时生成项目简报数据";
public static final int INIT_VERSION = 0; public static final int INIT_VERSION = 0;
public static final String DEFAULT_PROJECT_URL = "https://brandkbs.zhiweidata.com/images/event.jpg";
public static Long add8Hours(String time, FastDateFormat dateFormat) { public static Long add8Hours(String time, FastDateFormat dateFormat) {
try { try {
......
...@@ -67,13 +67,13 @@ public class ChannelController extends BaseController { ...@@ -67,13 +67,13 @@ public class ChannelController extends BaseController {
@GetMapping("/list") @GetMapping("/list")
public ResponseResult findChannelList(@RequestParam(value = "page", defaultValue = "1") int page, public ResponseResult findChannelList(@RequestParam(value = "page", defaultValue = "1") int page,
@RequestParam(value = "pageSize", defaultValue = "10") int size, @RequestParam(value = "pageSize", defaultValue = "10") int size,
@RequestParam(value = "contendId",defaultValue = "0") String contendId, @RequestParam(value = "contendId", defaultValue = "0") String contendId,
@RequestParam(value = "emotion", defaultValue = "") String emotion, @RequestParam(value = "emotion", required = false) String emotion,
@RequestParam(value = "platform", defaultValue = "") String platform, @RequestParam(value = "platform", required = false) String platform,
@RequestParam(value = "show", required = false) Boolean show, @RequestParam(value = "show", required = false) Boolean show,
@RequestParam(value = "keyword", defaultValue = "") String keyword, @RequestParam(value = "keyword", required = false) String keyword,
@RequestParam(value = "sorter", defaultValue = "{\"lastTime\":\"descend\"}") String sorter) { @RequestParam(value = "sorter", defaultValue = "{\"last_time\":\"descend\"}") String sorter) {
PageVO<JSONObject> channelList = channelService.findChannelList(page, size, contendId, emotion, platform, show, keyword, sorter); PageVO<JSONObject> channelList = channelService.findChannelListNew(page, size, contendId, emotion, platform, show, keyword, sorter);
return ResponseResult.success(channelList); return ResponseResult.success(channelList);
} }
......
...@@ -181,8 +181,8 @@ public class AppSearchController extends BaseController { ...@@ -181,8 +181,8 @@ public class AppSearchController extends BaseController {
@ApiOperation("搜索-渠道列表") @ApiOperation("搜索-渠道列表")
@PostMapping(value = "/channel/channelList") @PostMapping(value = "/channel/channelList")
public ResponseResult getChannelList(@RequestBody ChannelSearchDTO channelSearchDTO) { public ResponseResult getChannelList(@RequestBody ChannelSearchDTO channelSearchDTO) {
return ResponseResult.success(channelService.getChannelList(channelSearchDTO.getPage(), channelSearchDTO.getPageSize(), channelSearchDTO.getKeyword(), return ResponseResult.success(channelService.getChannelListNew(channelSearchDTO.getPage(), channelSearchDTO.getPageSize(),
channelSearchDTO.getPlatform(), channelSearchDTO.getEmotions(), channelSearchDTO.getMediaTypes(), channelSearchDTO.getArticlesCount(), channelSearchDTO.getSorter())); channelSearchDTO.getKeyword(), channelSearchDTO.getPlatform(), channelSearchDTO.getEmotions(), channelSearchDTO.getMediaTypes(), channelSearchDTO.getArticlesCount(), channelSearchDTO.getSorter()));
} }
@ApiOperation("搜索-热门事件") @ApiOperation("搜索-热门事件")
......
...@@ -138,6 +138,7 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao { ...@@ -138,6 +138,7 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
private Criteria eventCountCriteria(Long startTime, Long endTime, String emotion, String projectId, String contendId) { private Criteria eventCountCriteria(Long startTime, Long endTime, String emotion, String projectId, String contendId) {
Criteria criteria = Criteria.where("projectId").is(projectId); Criteria criteria = Criteria.where("projectId").is(projectId);
criteria.and("contendId").is(contendId); criteria.and("contendId").is(contendId);
if (null != startTime || null != endTime) {
Criteria startTimeCriteria = criteria.and("startTime"); Criteria startTimeCriteria = criteria.and("startTime");
if (null != startTime) { if (null != startTime) {
startTimeCriteria.gte(startTime); startTimeCriteria.gte(startTime);
...@@ -145,6 +146,7 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao { ...@@ -145,6 +146,7 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
if (null != endTime) { if (null != endTime) {
startTimeCriteria.lt(endTime); startTimeCriteria.lt(endTime);
} }
}
if (StringUtils.isNotEmpty(emotion) && !EmotionEnum.ALL.getName().equals(emotion)) { if (StringUtils.isNotEmpty(emotion) && !EmotionEnum.ALL.getName().equals(emotion)) {
criteria.and("emotion").is(emotion); criteria.and("emotion").is(emotion);
} }
......
...@@ -55,13 +55,14 @@ public enum ChannelEmotion { ...@@ -55,13 +55,14 @@ public enum ChannelEmotion {
} }
public static int getEmotionRank(Double emotionIndex) { public static int getEmotionRank(Double emotionIndex) {
int rank; int rank = 3;
if (null == emotionIndex) {
return rank;
}
if (emotionIndex >= 80) { if (emotionIndex >= 80) {
rank = 1; rank = 1;
} else if (emotionIndex >= 60) { } else if (emotionIndex >= 60) {
rank = 2; rank = 2;
} else {
rank = 3;
} }
return rank; return rank;
} }
......
...@@ -122,6 +122,12 @@ public enum EmotionEnum { ...@@ -122,6 +122,12 @@ public enum EmotionEnum {
return value; return value;
} }
} }
if ("友好".equals(name)) {
return EmotionEnum.POSITIVE;
}
if ("不友好".equals(name) || "敏感".equals(name)) {
return EmotionEnum.NEGATIVE;
}
return UNDEFINED; return UNDEFINED;
} }
......
...@@ -25,6 +25,7 @@ import java.io.IOException; ...@@ -25,6 +25,7 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Calendar; import java.util.Calendar;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -43,6 +44,8 @@ public class ChannelEsDao extends EsClientDao { ...@@ -43,6 +44,8 @@ public class ChannelEsDao extends EsClientDao {
@Resource(name = "commonServiceImpl") @Resource(name = "commonServiceImpl")
CommonService commonService; CommonService commonService;
public static final String CHANNEL_COPY_ES_INDEX_NAME = "brandkbs2_channel_copy";
public List<String> getArticleIds(String fid) { public List<String> getArticleIds(String fid) {
Long[] timeRangeMonth = commonService.getTimeRangeMonth(); Long[] timeRangeMonth = commonService.getTimeRangeMonth();
return getArticleIds(timeRangeMonth[0], timeRangeMonth[1], fid); return getArticleIds(timeRangeMonth[0], timeRangeMonth[1], fid);
...@@ -98,6 +101,22 @@ public class ChannelEsDao extends EsClientDao { ...@@ -98,6 +101,22 @@ public class ChannelEsDao extends EsClientDao {
} }
} }
public void batchInsert(List<Map<String, Object>> insertList) {
retryTemplate.execute(context -> {
try {
BulkRequest bulkRequest = new BulkRequest();
for (Map<String, Object> map : insertList) {
bulkRequest.add(new IndexRequest(CHANNEL_COPY_ES_INDEX_NAME).id(map.get("id") + "").source(map));
}
channelEsClient.bulk(bulkRequest, RequestOptions.DEFAULT);
log.info("批量插入es数据【{}】条", insertList.size());
} catch (Exception e) {
e.printStackTrace();
}
return null;
});
}
@Override @Override
public String[] getIndexes() { public String[] getIndexes() {
return new String[]{getChannelRecordIndex()}; return new String[]{getChannelRecordIndex()};
......
...@@ -157,30 +157,34 @@ public class EsQueryTools { ...@@ -157,30 +157,34 @@ public class EsQueryTools {
// return tagQuery; // return tagQuery;
// } // }
public static BoolQueryBuilder assembleSourceQuery(String sourceKeyword) { public static BoolQueryBuilder assembleFiledKeywordQuery(String field, String keyword) {
BoolQueryBuilder channelBoolQueryBuilder = QueryBuilders.boolQuery(); BoolQueryBuilder channelBoolQueryBuilder = QueryBuilders.boolQuery();
String[] keys = sourceKeyword.trim().split("\\|"); String[] keys = keyword.trim().split("\\|");
for (String key : keys) { for (String key : keys) {
String channelRegex = getAllRegex(key); String channelRegex = getAllRegex(key);
BoolQueryBuilder keyQueryBuilder = QueryBuilders.boolQuery(); BoolQueryBuilder keyQueryBuilder = QueryBuilders.boolQuery();
keyQueryBuilder.must(QueryBuilders.regexpQuery("source", ".*" + channelRegex + ".*")); keyQueryBuilder.must(QueryBuilders.regexpQuery(field, ".*" + channelRegex + ".*"));
channelBoolQueryBuilder.should(keyQueryBuilder); channelBoolQueryBuilder.should(keyQueryBuilder);
} }
return channelBoolQueryBuilder; return channelBoolQueryBuilder;
} }
public static BoolQueryBuilder assembleChannelSourceQuery(String sourceKeyword) { public static BoolQueryBuilder assembleSourceQuery(String sourceKeyword) {
BoolQueryBuilder channelBoolQueryBuilder = QueryBuilders.boolQuery(); return assembleFiledKeywordQuery("source", sourceKeyword);
String[] keys = sourceKeyword.trim().split("\\|");
for (String key : keys) {
String channelRegex = getAllRegex(key);
BoolQueryBuilder keyQueryBuilder = QueryBuilders.boolQuery();
keyQueryBuilder.must(QueryBuilders.regexpQuery("source.keyword", ".*" + channelRegex + ".*"));
channelBoolQueryBuilder.should(keyQueryBuilder);
}
return channelBoolQueryBuilder;
} }
// public static BoolQueryBuilder assembleChannelSourceQuery(String sourceKeyword) {
// BoolQueryBuilder channelBoolQueryBuilder = QueryBuilders.boolQuery();
// String[] keys = sourceKeyword.trim().split("\\|");
// for (String key : keys) {
// String channelRegex = getAllRegex(key);
// BoolQueryBuilder keyQueryBuilder = QueryBuilders.boolQuery();
// keyQueryBuilder.must(QueryBuilders.regexpQuery("source.keyword", ".*" + channelRegex + ".*"));
// channelBoolQueryBuilder.should(keyQueryBuilder);
// }
// return channelBoolQueryBuilder;
// }
public static void platformMustNot(BoolQueryBuilder boolQueryBuilder, String... platformNames) { public static void platformMustNot(BoolQueryBuilder boolQueryBuilder, String... platformNames) {
if (null == platformNames) { if (null == platformNames) {
return; return;
......
package com.zhiwei.brandkbs2.pojo; package com.zhiwei.brandkbs2.pojo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.PropertyNamingStrategy;
import com.alibaba.fastjson.serializer.SerializeConfig;
import com.zhiwei.brandkbs2.util.Tools; import com.zhiwei.brandkbs2.util.Tools;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.bson.types.ObjectId; import org.bson.types.ObjectId;
import java.util.Date; import java.util.Date;
import java.util.Map;
/** /**
* @ClassName: Channel * @ClassName: Channel
...@@ -23,11 +28,21 @@ public class Channel extends ChannelIndex { ...@@ -23,11 +28,21 @@ public class Channel extends ChannelIndex {
private Long cTime; private Long cTime;
/** /**
* 最近发文 * 最近发文时间
*/ */
private Long lastTime; private Long lastTime;
/** /**
* 最近发文
*/
private String lastArticle;
/**
* 媒体类型
*/
private String mediaType;
/**
* 图片头像地址 * 图片头像地址
*/ */
private String avatarUrl; private String avatarUrl;
...@@ -64,6 +79,12 @@ public class Channel extends ChannelIndex { ...@@ -64,6 +79,12 @@ public class Channel extends ChannelIndex {
private Double influence; private Double influence;
protected static final SerializeConfig JSON_CONFIG = new SerializeConfig();
static {
JSON_CONFIG.propertyNamingStrategy = PropertyNamingStrategy.SnakeCase;
}
public void setLastTime(Long lastTime) { public void setLastTime(Long lastTime) {
if (null == this.lastTime || this.lastTime < lastTime) { if (null == this.lastTime || this.lastTime < lastTime) {
this.lastTime = lastTime; this.lastTime = lastTime;
...@@ -102,18 +123,23 @@ public class Channel extends ChannelIndex { ...@@ -102,18 +123,23 @@ public class Channel extends ChannelIndex {
channel.setLastTime(record.getLastTime()); channel.setLastTime(record.getLastTime());
channel.setRecord(record); channel.setRecord(record);
// // TODO 调性随机分配
// double random = Math.random();
// if (random < 0.3) {
// channel.setEmotion(ChannelEmotion.POSITIVE.getState());
// } else if (random < 0.6) {
// channel.setEmotion(ChannelEmotion.NEGATIVE.getState());
// } else {
// channel.setEmotion(ChannelEmotion.NEUTRAL.getState());
// }
// // TODO 情感指数随机分配
// channel.setEmotionIndex(Math.random() * 100);
return channel; return channel;
} }
public Map<String, Object> createChannelCopyMap() {
JSONObject json = JSON.parseObject(JSON.toJSONString(this, JSON_CONFIG));
json.remove("channel_index");
return json;
// esMap.put("_id",this.getId());
// esMap.put("id",this.getId());
// esMap.put("project_id",this.getProjectId());
// esMap.put("contend_id",this.getContendId());
// esMap.put("platform",this.getPlatform());
// esMap.put("real_source",this.getRealSource());
// esMap.put("source",this.getSource());
// esMap.put("fid",this.getFid());
// esMap.put("emotion",this.getEmotion());
// esMap.put("emotion_index",this.getEmotionIndex());
}
} }
...@@ -8,6 +8,8 @@ import io.swagger.annotations.ApiModelProperty; ...@@ -8,6 +8,8 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
import lombok.ToString; import lombok.ToString;
import java.util.Map;
@Data @Data
@ToString @ToString
public class ChannelVO { public class ChannelVO {
...@@ -61,8 +63,7 @@ public class ChannelVO { ...@@ -61,8 +63,7 @@ public class ChannelVO {
channelVO.setAvatarUrl(channel.getAvatarUrl()); channelVO.setAvatarUrl(channel.getAvatarUrl());
channelVO.setEventCount(channel.getEventCount()); channelVO.setEventCount(channel.getEventCount());
channelVO.setArticleCount(channel.getArticleCount()); channelVO.setArticleCount(channel.getArticleCount());
// TODO influence实现 channelVO.setInfluence(channel.getInfluence());
channelVO.setInfluence(null);
String mediaType = GlobalPojo.getMediaType(projectId, channel.getPlatform(), channel.getSource()); String mediaType = GlobalPojo.getMediaType(projectId, channel.getPlatform(), channel.getSource());
if (null != mediaType) { if (null != mediaType) {
channelVO.setMediaType(mediaType.replaceAll(",", " ")); channelVO.setMediaType(mediaType.replaceAll(",", " "));
...@@ -71,4 +72,22 @@ public class ChannelVO { ...@@ -71,4 +72,22 @@ public class ChannelVO {
return channelVO; return channelVO;
} }
public static ChannelVO createFromChannelCopyMap(Map<String,Object> map){
JSONObject json = new JSONObject(map);
ChannelVO channelVO = new ChannelVO();
channelVO.setId(json.getString("id"));
channelVO.setSource(json.getString("source"));
channelVO.setRealSource(json.getString("real_source"));
channelVO.setEmotion(ChannelEmotion.getNameFromState(json.getInteger("emotion")));
channelVO.setEmotionRank(ChannelEmotion.getEmotionRank(json.getDouble("emotion_index")));
channelVO.setAvatarUrl(json.getString("avatar_url"));
channelVO.setEventCount(json.getLong("event_count"));
channelVO.setArticleCount(json.getLong("article_count"));
channelVO.setInfluence(json.getDouble("influence"));
channelVO.setMediaType(json.getString("media_type"));
channelVO.setLastArticle(json.getJSONObject("last_article"));
return channelVO;
}
} }
package com.zhiwei.brandkbs2.pojo.vo; package com.zhiwei.brandkbs2.pojo.vo;
import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.pojo.Contend; import com.zhiwei.brandkbs2.pojo.Contend;
import com.zhiwei.brandkbs2.pojo.Project; import com.zhiwei.brandkbs2.pojo.Project;
import com.zhiwei.middleware.event.pojo.dto.EventTagRelatedDTO; import com.zhiwei.middleware.event.pojo.dto.EventTagRelatedDTO;
import com.zhiwei.middleware.event.pojo.vo.EventTagBrandkbsBindingVO;
import com.zhiwei.middleware.mark.vo.MarkerTag; import com.zhiwei.middleware.mark.vo.MarkerTag;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
...@@ -145,7 +145,11 @@ public class ProjectVO { ...@@ -145,7 +145,11 @@ public class ProjectVO {
project.setHitKeywords(this.getHitKeywords()); project.setHitKeywords(this.getHitKeywords());
project.setHighKeywords(this.getHighKeywords()); project.setHighKeywords(this.getHighKeywords());
project.setMergeSensitive(this.isMergeSensitive()); project.setMergeSensitive(this.isMergeSensitive());
if (null == this.getAvatarUrl()) {
project.setAvatarUrl(Constant.DEFAULT_PROJECT_URL);
} else {
project.setAvatarUrl(this.getAvatarUrl()); project.setAvatarUrl(this.getAvatarUrl());
}
project.setWholeSearchDataSource(this.getWholeSearchDataSource()); project.setWholeSearchDataSource(this.getWholeSearchDataSource());
project.setImportTime(this.getImportTime().getTime()); project.setImportTime(this.getImportTime().getTime());
project.setHasContend(null != this.getContendList()); project.setHasContend(null != this.getContendList());
......
...@@ -37,6 +37,9 @@ public interface ChannelService { ...@@ -37,6 +37,9 @@ public interface ChannelService {
*/ */
PageVO<JSONObject> findChannelList(int page, int size, String contendId, String emotion, String platform, Boolean show, String keyword, String sorter); PageVO<JSONObject> findChannelList(int page, int size, String contendId, String emotion, String platform, Boolean show, String keyword, String sorter);
PageVO<JSONObject> findChannelListNew(int page, int size, String contendId, String emotion, String platform, Boolean show, String keyword,
String sorter);
/** /**
* 根据搜索条件查询稿件列表 * 根据搜索条件查询稿件列表
* *
...@@ -262,4 +265,7 @@ public interface ChannelService { ...@@ -262,4 +265,7 @@ public interface ChannelService {
PageVO<ChannelVO> getChannelList(int page, int pageSize, String keyword, List<String> platforms, List<Integer> emotions, List<String> mediaTypes, PageVO<ChannelVO> getChannelList(int page, int pageSize, String keyword, List<String> platforms, List<Integer> emotions, List<String> mediaTypes,
Integer[] articlesCount, String sorter); Integer[] articlesCount, String sorter);
PageVO<ChannelVO> getChannelListNew(int page, int pageSize, String keyword, List<String> platforms, List<Integer> emotions, List<String> mediaTypes,
Integer[] articlesCount, String sorter);
} }
...@@ -42,6 +42,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder; ...@@ -42,6 +42,7 @@ import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.BucketOrder; import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.Terms;
...@@ -143,6 +144,58 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -143,6 +144,58 @@ public class ChannelServiceImpl implements ChannelService {
} }
@Override @Override
public PageVO<JSONObject> findChannelListNew(int page, int size, String contendId, String emotion, String platform, Boolean show, String keyword,
String sorter) {
try {
EsClientDao.SearchHelper searchHelper = EsClientDao.createSearchHelper();
List<Integer> emotions = null == emotion ? null : Collections.singletonList(EmotionEnum.parseFromName(emotion).getState());
List<String> platforms = null == platform ? null : Collections.singletonList(platform);
BoolQueryBuilder postFilter = getChannelListQuery(UserThreadLocal.getProjectId(), contendId, keyword, platforms, emotions, null, null);
// show
if (null != show) {
postFilter.must(QueryBuilders.termQuery("show", show));
}
searchHelper.setPostFilter(postFilter);
if (null != sorter) {
for (Map.Entry<String, Object> entry : JSONObject.parseObject(sorter).entrySet()) {
if (entry.getValue().toString().contains("desc")) {
searchHelper.setSort(SortBuilders.fieldSort(entry.getKey()).order(SortOrder.DESC));
} else {
searchHelper.setSort(SortBuilders.fieldSort(entry.getKey()).order(SortOrder.ASC));
}
}
}
searchHelper.setFrom((page - 1) * size);
searchHelper.setSize(size);
searchHelper.setIndexes(new String[]{ChannelEsDao.CHANNEL_COPY_ES_INDEX_NAME});
SearchHits searchHits = channelEsDao.searchHits(searchHelper);
long value = searchHits.getTotalHits().value;
List<JSONObject> resList = Arrays.stream(searchHits.getHits()).map(hit -> {
JSONObject json = new JSONObject(hit.getSourceAsMap());
JSONObject result = new JSONObject();
result.put("id", json.getString("id"));
result.put("platform", json.getString("platform"));
result.put("realSource", json.getString("real_source"));
result.put("source", json.getString("source"));
result.put("articleCount", json.getLong("article_count"));
result.put("eventCount", json.getLong("event_count"));
result.put("emotion", EmotionEnum.state2Name(json.getInteger("emotion")));
result.put("emotionIndex", json.getDouble("emotion_index"));
result.put("experienceLevel", ExperienceEnum.getValueFromDataBaseName(json.getString("experience_level")));
result.put("lastTime", json.getLong("last_time"));
result.put("show", json.getBoolean("show"));
result.put("imgUrl", json.getString("avatar_url"));
result.put("tag", channelTagDao.getTagByChannelName(json.getString("source")));
return result;
}).collect(Collectors.toList());
return PageVO.createPageVo(value, page, size, resList);
} catch (Exception e) {
log.error("findChannelListNew-", e);
}
return null;
}
@Override
public PageVO<JSONObject> findArticleList(int page, int size, String channelId) { public PageVO<JSONObject> findArticleList(int page, int size, String channelId) {
try { try {
Channel channel = channelDao.findOneById(channelId); Channel channel = channelDao.findOneById(channelId);
...@@ -869,6 +922,74 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -869,6 +922,74 @@ public class ChannelServiceImpl implements ChannelService {
channelList.stream().map(channel -> ChannelVO.createFromChannelInfo(idMap.get(channel.getId()), projectId)).collect(Collectors.toList())); channelList.stream().map(channel -> ChannelVO.createFromChannelInfo(idMap.get(channel.getId()), projectId)).collect(Collectors.toList()));
} }
@Override
public PageVO<ChannelVO> getChannelListNew(int page, int pageSize, String keyword, List<String> platforms, List<Integer> emotions, List<String> mediaTypes,
Integer[] articlesCount, String sorter) {
try {
String projectId = UserThreadLocal.getProjectId();
String contendId = "0";
EsClientDao.SearchHelper searchHelper = EsClientDao.createSearchHelper();
BoolQueryBuilder postFilter = getChannelListQuery(projectId, contendId, keyword, platforms, emotions, mediaTypes, articlesCount);
searchHelper.setPostFilter(postFilter);
searchHelper.setSort(SortBuilders.fieldSort("influence").order(SortOrder.DESC));
searchHelper.setFrom((page - 1) * pageSize);
searchHelper.setSize(pageSize);
searchHelper.setIndexes(new String[]{ChannelEsDao.CHANNEL_COPY_ES_INDEX_NAME});
SearchHits searchHits = channelEsDao.searchHits(searchHelper);
long value = searchHits.getTotalHits().value;
List<ChannelVO> collect = Arrays.stream(searchHits.getHits()).map(hit -> ChannelVO.createFromChannelCopyMap(hit.getSourceAsMap())).collect(Collectors.toList());
return PageVO.createPageVo(value, page, pageSize, collect);
} catch (Exception e) {
log.error("getChannelListNew-", e);
}
return null;
}
private BoolQueryBuilder getChannelListQuery(String projectId, String contendId, String keyword, List<String> platforms, List<Integer> emotions, List<String> mediaTypes,
Integer[] articlesCount) {
BoolQueryBuilder postFilter = QueryBuilders.boolQuery();
postFilter.must(QueryBuilders.termQuery("project_id.keyword", projectId));
postFilter.must(QueryBuilders.termQuery("contend_id.keyword", contendId));
// source关键词
if (null != keyword) {
postFilter.must(EsQueryTools.assembleFiledKeywordQuery("source", keyword));
}
// platform限制
if (!(Tools.isEmpty(platforms) || platforms.contains("全部"))) {
BoolQueryBuilder platformBuilder = QueryBuilders.boolQuery();
List<String> collect = platforms.stream().map(GlobalPojo::getPlatformNameById).filter(Objects::nonNull).collect(Collectors.toList());
if (!Tools.isEmpty(collect)) {
platforms = collect;
}
platforms.forEach(platformName -> {
platformBuilder.should(QueryBuilders.termQuery("platform", platformName));
});
postFilter.must(platformBuilder);
}
// emotion限制
if (!(Tools.isEmpty(emotions) || emotions.contains(-1))) {
BoolQueryBuilder emotionBuilder = QueryBuilders.boolQuery();
emotions.forEach(emotion -> {
emotionBuilder.should(QueryBuilders.termQuery("emotion", emotion));
});
postFilter.must(emotionBuilder);
}
// 媒体类别
if (!(Tools.isEmpty(mediaTypes) || mediaTypes.contains("全部"))) {
BoolQueryBuilder mediaTypeBuilder = QueryBuilders.boolQuery();
mediaTypes.forEach(mediaType -> {
// TODO list格式匹配
mediaTypeBuilder.should(QueryBuilders.termQuery("media_type.keyword", mediaType));
});
postFilter.must(mediaTypeBuilder);
}
// articlesCount
if (null != articlesCount) {
postFilter.must(QueryBuilders.rangeQuery("article_count").gte(articlesCount[0]).lt(articlesCount[1]));
}
return postFilter;
}
private List<JSONObject> getArticleList() { private List<JSONObject> getArticleList() {
List<JSONObject> res = new ArrayList<>(); List<JSONObject> res = new ArrayList<>();
for (String name : Arrays.asList("全部", "1-10篇", "11-15篇", "51-100篇")) { for (String name : Arrays.asList("全部", "1-10篇", "11-15篇", "51-100篇")) {
...@@ -1424,7 +1545,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -1424,7 +1545,7 @@ public class ChannelServiceImpl implements ChannelService {
} }
// keyword // keyword
if (StringUtils.isNotEmpty(keyword)) { if (StringUtils.isNotEmpty(keyword)) {
query.must(EsQueryTools.assembleChannelSourceQuery(keyword)); query.must(EsQueryTools.assembleSourceQuery(keyword));
} }
// timeRange // timeRange
// 默认搜索一周 // 默认搜索一周
......
...@@ -1532,7 +1532,8 @@ public class MarkDataServiceImpl implements MarkDataService { ...@@ -1532,7 +1532,8 @@ public class MarkDataServiceImpl implements MarkDataService {
return null; return null;
} }
return getTitleAndUrl(searchHits.getAt(0).getSourceAsMap()); return getTitleAndUrl(searchHits.getAt(0).getSourceAsMap());
} catch (IOException e) { } catch (Exception e) {
log.info("getLastMarkData-",e);
return null; return null;
} }
} }
......
...@@ -3,7 +3,10 @@ package com.zhiwei.brandkbs2.service.impl; ...@@ -3,7 +3,10 @@ package com.zhiwei.brandkbs2.service.impl;
import com.zhiwei.brandkbs2.auth.UserThreadLocal; import com.zhiwei.brandkbs2.auth.UserThreadLocal;
import com.zhiwei.brandkbs2.common.GlobalPojo; import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.config.Constant; import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.*; import com.zhiwei.brandkbs2.dao.AggreeResultDao;
import com.zhiwei.brandkbs2.dao.BrandkbsTaskDao;
import com.zhiwei.brandkbs2.dao.ChannelDao;
import com.zhiwei.brandkbs2.dao.ReportSettingsDao;
import com.zhiwei.brandkbs2.enmus.ReportTypeEnum; import com.zhiwei.brandkbs2.enmus.ReportTypeEnum;
import com.zhiwei.brandkbs2.es.ChannelEsDao; import com.zhiwei.brandkbs2.es.ChannelEsDao;
import com.zhiwei.brandkbs2.es.EsClientDao; import com.zhiwei.brandkbs2.es.EsClientDao;
...@@ -91,6 +94,7 @@ public class TaskServiceImpl implements TaskService { ...@@ -91,6 +94,7 @@ public class TaskServiceImpl implements TaskService {
log.info("渠道统计-搜索到近{}天的受影响渠道数{}条", day, channelIndexRecordMap.size()); log.info("渠道统计-搜索到近{}天的受影响渠道数{}条", day, channelIndexRecordMap.size());
long handleSize = 0; long handleSize = 0;
List<Channel> insertList = new ArrayList<>(); List<Channel> insertList = new ArrayList<>();
List<Channel> updateList = new ArrayList<>();
List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList = new ArrayList<>(); List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList = new ArrayList<>();
// 新recordMap // 新recordMap
Map<ChannelIndex, ChannelIndex.Record> newRecordMap = new ConcurrentHashMap<>(); Map<ChannelIndex, ChannelIndex.Record> newRecordMap = new ConcurrentHashMap<>();
...@@ -98,7 +102,9 @@ public class TaskServiceImpl implements TaskService { ...@@ -98,7 +102,9 @@ public class TaskServiceImpl implements TaskService {
batchList.add(entry); batchList.add(entry);
// 每100条做一次清算 // 每100条做一次清算
if (++handleSize % 100 == 0 || handleSize == channelIndexRecordMap.size()) { if (++handleSize % 100 == 0 || handleSize == channelIndexRecordMap.size()) {
insertList.addAll(batchHandle(batchList, newRecordMap)); Pair<List<Channel>, List<Channel>> listPair = batchHandle(batchList, newRecordMap);
insertList.addAll(listPair.getLeft());
updateList.addAll(listPair.getRight());
batchList = new ArrayList<>(); batchList = new ArrayList<>();
} }
if (handleSize % 10000 == 0) { if (handleSize % 10000 == 0) {
...@@ -115,6 +121,13 @@ public class TaskServiceImpl implements TaskService { ...@@ -115,6 +121,13 @@ public class TaskServiceImpl implements TaskService {
Long[] timeMinMax = Tools.timeMinMax(rangeTimeRecords.stream().map(Pair::getLeft).collect(Collectors.toList())); Long[] timeMinMax = Tools.timeMinMax(rangeTimeRecords.stream().map(Pair::getLeft).collect(Collectors.toList()));
List<ChannelRecord> channelRecords = ChannelRecord.createChannelRecords(timeMinMax[0], timeMinMax[1], channelIndexRecordMap); List<ChannelRecord> channelRecords = ChannelRecord.createChannelRecords(timeMinMax[0], timeMinMax[1], channelIndexRecordMap);
channelEsDao.upsertChannelRecord(channelRecords); channelEsDao.upsertChannelRecord(channelRecords);
// 同步channelCopy
ListUtils.partition(insertList, 1000).forEach(list -> {
channelEsDao.batchInsert(list.stream().map(Channel::createChannelCopyMap).collect(Collectors.toList()));
});
ListUtils.partition(updateList, 1000).forEach(list -> {
channelEsDao.batchInsert(list.stream().map(Channel::createChannelCopyMap).collect(Collectors.toList()));
});
log.info("渠道统计-渠道记录-统计结束"); log.info("渠道统计-渠道记录-统计结束");
} }
...@@ -159,8 +172,10 @@ public class TaskServiceImpl implements TaskService { ...@@ -159,8 +172,10 @@ public class TaskServiceImpl implements TaskService {
}, cacheServiceExecutor)).toArray(CompletableFuture[]::new)).join(); }, cacheServiceExecutor)).toArray(CompletableFuture[]::new)).join();
} }
private List<Channel> batchHandle(List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList, Map<ChannelIndex, ChannelIndex.Record> newRecordMap) { private Pair<List<Channel>, List<Channel>> batchHandle(List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList, Map<ChannelIndex,
ChannelIndex.Record> newRecordMap) {
List<Channel> insertList = Collections.synchronizedList(new ArrayList<>()); List<Channel> insertList = Collections.synchronizedList(new ArrayList<>());
List<Channel> updateList = Collections.synchronizedList(new ArrayList<>());
List<String> fids = batchList.stream().map(channelIndexRecordEntry -> channelIndexRecordEntry.getKey().getFid()).collect(Collectors.toList()); List<String> fids = batchList.stream().map(channelIndexRecordEntry -> channelIndexRecordEntry.getKey().getFid()).collect(Collectors.toList());
Map<String, Channel> fidChannel = channelDao.queryUniqueAsync(fids); Map<String, Channel> fidChannel = channelDao.queryUniqueAsync(fids);
CompletableFuture.allOf(batchList.stream().map(entry -> CompletableFuture.supplyAsync(() -> { CompletableFuture.allOf(batchList.stream().map(entry -> CompletableFuture.supplyAsync(() -> {
...@@ -172,27 +187,14 @@ public class TaskServiceImpl implements TaskService { ...@@ -172,27 +187,14 @@ public class TaskServiceImpl implements TaskService {
} else { } else {
channel.setRecord(entry.getValue()); channel.setRecord(entry.getValue());
channelDao.updateOne(channelService.calculateChannelEmotionIndex(channel)); channelDao.updateOne(channelService.calculateChannelEmotionIndex(channel));
updateList.add(channel);
} }
// 设置查询数值 // 设置查询数值
entry.getKey().setChannelInfo(channel); entry.getKey().setChannelInfo(channel);
newRecordMap.put(entry.getKey(), entry.getValue()); newRecordMap.put(entry.getKey(), entry.getValue());
return null; return null;
}, taskServiceExecutor)).toArray(CompletableFuture[]::new)).join(); }, taskServiceExecutor)).toArray(CompletableFuture[]::new)).join();
// for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : batchList) { return Pair.of(insertList, updateList);
// String fid = entry.getKey().getFid();
// Channel channel = fidChannel.get(fid);
// if (null == channel) {
// channel = Channel.createFromChannelIndexRecord(entry.getKey(), entry.getValue());
// insertList.add(channelService.calculateChannelEmotionIndex(channel));
// } else {
// channel.setRecord(entry.getValue());
// channelDao.updateOne(channelService.calculateChannelEmotionIndex(channel));
// }
// // 设置查询数值
// entry.getKey().setChannelInfo(channel);
// newRecordMap.put(entry.getKey(), entry.getValue());
// }
return insertList;
} }
@Deprecated @Deprecated
......
...@@ -32,8 +32,10 @@ secondary.uri=mongodb://qbjcuser:qbjc1q2w3e4r@192.168.0.150:27017,192.168.0.151: ...@@ -32,8 +32,10 @@ secondary.uri=mongodb://qbjcuser:qbjc1q2w3e4r@192.168.0.150:27017,192.168.0.151:
es.clusterNodes=192.168.0.130:9200 es.clusterNodes=192.168.0.130:9200
es.clusterName=zhiweidata-new-es es.clusterName=zhiweidata-new-es
es.httpClusterNodes=192.168.0.130:9200:qbjc-back:yuqing.zhiweidata.com,192.168.0.51:9400:elastic:qWxZRW42OHkuOhmF5AXX es.httpClusterNodes=192.168.0.130:9200:qbjc-back:yuqing.zhiweidata.com,192.168.0.51:9400:elastic:qWxZRW42OHkuOhmF5AXX
es.username=brandkbs2_test #es.username=brandkbs2_test
es.password=3vh65l$i6qQA #es.password=3vh65l$i6qQA
es.username=joker
es.password=jokerdevops
es.index.test = true es.index.test = true
#channel-index #channel-index
......
...@@ -34,8 +34,10 @@ secondary.uri=mongodb://qbjcuser:qbjc1q2w3e4r@202.107.192.94:17152/qbjc?authSour ...@@ -34,8 +34,10 @@ secondary.uri=mongodb://qbjcuser:qbjc1q2w3e4r@202.107.192.94:17152/qbjc?authSour
es.clusterNodes=202.107.192.94:1443 es.clusterNodes=202.107.192.94:1443
es.clusterName=zhiweidata-new-es es.clusterName=zhiweidata-new-es
es.httpClusterNodes=202.107.192.94:1443:brandkbs2:3vh65l$i6qQA es.httpClusterNodes=202.107.192.94:1443:brandkbs2:3vh65l$i6qQA
es.username=brandkbs2_test #es.username=brandkbs2_test jokerdevops
es.password=3vh65l$i6qQA #es.password=3vh65l$i6qQA
es.username=joker
es.password=jokerdevops
es.index.test = true es.index.test = true
#channel-index #channel-index
......
...@@ -32,8 +32,10 @@ secondary.uri=mongodb://qbjcuser:qbjc1q2w3e4r@192.168.0.150:27017,192.168.0.151: ...@@ -32,8 +32,10 @@ secondary.uri=mongodb://qbjcuser:qbjc1q2w3e4r@192.168.0.150:27017,192.168.0.151:
es.clusterNodes=192.168.0.130:9200 es.clusterNodes=192.168.0.130:9200
es.clusterName=zhiweidata-new-es es.clusterName=zhiweidata-new-es
es.httpClusterNodes=192.168.0.130:9200:qbjc-back:yuqing.zhiweidata.com,192.168.0.51:9400:elastic:qWxZRW42OHkuOhmF5AXX es.httpClusterNodes=192.168.0.130:9200:qbjc-back:yuqing.zhiweidata.com,192.168.0.51:9400:elastic:qWxZRW42OHkuOhmF5AXX
es.username=brandkbs2_test #es.username=brandkbs2_test
es.password=3vh65l$i6qQA #es.password=3vh65l$i6qQA
es.username=joker
es.password=jokerdevops
es.index.test = true es.index.test = true
#channel-index #channel-index
......
package com.zhiwei.brandkbs2;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.dao.ChannelDao;
import com.zhiwei.brandkbs2.es.ChannelEsDao;
import com.zhiwei.brandkbs2.pojo.Channel;
import com.zhiwei.brandkbs2.service.MarkDataService;
import com.zhiwei.brandkbs2.service.ProjectService;
import lombok.Data;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
* @ClassName: CopyChannelFromMongo2Es
* @Description 迁移mongo数据从mongo到es库
* @author: sjj
* @date: 2022-11-29 11:28
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest
public class CopyChannelFromMongo2Es {
private static final Logger log = LogManager.getLogger(CopyChannelFromMongo2Es.class);
@Autowired
ProjectService projectService;
@Autowired
MarkDataService markDataService;
@Autowired
ChannelDao channelDao;
@Autowired
ChannelEsDao channelEsDao;
@Resource(name = "esSearchExecutor")
ThreadPoolTaskExecutor esSearchExecutor;
@Test
public void test() throws IOException {
// 2022.6.1
long startTime = 1654012800000L;
Query query = new Query(Criteria.where("lastTime").gt(startTime));
long count = channelDao.count(query);
MongoQueryUtil mongoQueryUtil = new MongoQueryUtil(count);
mongoQueryUtil.setStartPage(4939);
while (mongoQueryUtil.hasNextPage()) {
List<Channel> list = channelDao.findList(mongoQueryUtil.getNextPageQuery());
List<CompletableFuture<Map<String, Object>>> futureList = list.stream().map(channel -> CompletableFuture.supplyAsync(() -> {
JSONObject markData = markDataService.getLastMarkData(channel.getProjectId(), "-1", channel.getContendId(), channel.getPlatform(), channel.getRealSource(), channel.getSource());
if (null != markData) {
channel.setLastArticle(markData.toJSONString());
}
channel.setMediaType(GlobalPojo.getMediaType(channel.getProjectId(), channel.getPlatform(), channel.getSource()));
return channel.createChannelCopyMap();
}, esSearchExecutor)).collect(Collectors.toList());
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
List<Map<String, Object>> collect = futureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
// 查询最近发文并设置
channelEsDao.batchInsert(collect);
log.info("total:{},current:{}", mongoQueryUtil.getTotal(), mongoQueryUtil.getStartPage() * mongoQueryUtil.getLimit());
}
}
@Data
public static class MongoQueryUtil {
private int limit = 1000;
private long startPage = 1;
private long total;
private AtomicLong removeSize = new AtomicLong();
public MongoQueryUtil(long total) {
this.total = total;
}
public boolean hasNextPage() {
long totalPage = (total + limit - 1) / limit;
return startPage <= totalPage;
}
public Query getNextPageQuery() {
Query query = new Query();
if (!hasNextPage()) {
throw new IllegalStateException("startPage>=totalPage");
}
query.skip((startPage - 1) * limit - removeSize.get());
query.limit(limit);
query.with(Sort.by(Sort.Order.desc("_id")));
startPage++;
return query;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment