Commit 404cb010 by shenjunjie

渠道库:渠道记录表及部分接口

parent 6476b0d5
......@@ -16,7 +16,7 @@ public class GenericAttribute {
*/
public static final String ES_INDEX_PRE = "brandkbs2";
public static final String ES_INDEX_TEST = "brandkbs2_test";
public static final String ES_CHANNEL_INDEX_TEST = "brandkbs2_channel_record_test";
public static final String ES_CHANNEL_INDEX_TEST = "brandkbs2_channel_record_test2";
/**
* es ind_title
**/
......@@ -45,6 +45,10 @@ public class GenericAttribute {
* es gid
**/
public static final String ES_GID = "gid";
public static final String ES_PROJECT_ID = "project_id";
public static final String ES_CONTEND_ID = "contend_id";
public static final String ES_CHANNEL_FID = "fid";
public static final String ES_CNAME = "cname";
/** es c2 **/
public static final String ES_C2 = "c2";
......@@ -75,7 +79,7 @@ public class GenericAttribute {
/** es mark_cache_maps **/
public static final String ES_MARK_CACHE_MAPS = "brandkbs_mark_cache_maps";
public static final String LINKED_GROUP_ID = "linkedGroupId";
public static final String ES_LINKED_GROUP_ID = "linked_group_id";
// public static final String ES_CACHE_MAP_PROJECT = "brandkbs_cache_maps.project_id.keyword";
// public static final String ES_CACHE_MAP_LINKED_GROUP_ID = "brandkbs_cache_maps.linked_group_id.keyword";
......
......@@ -9,22 +9,20 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
import java.util.Set;
/**
* @ClassName: AppChannelController
* @Description 前台渠道信息展示
* @Description 前台渠道信息展示
* @author: sjj
* @date: 2022-07-19 16:05
*/
@RestController
@RequestMapping("/app/channel")
@Api(tags = "前台渠道展示接口", description = "提供前台渠道相关信息展示")
@Api(tags = "前台渠道库展示接口", description = "提供前台渠道库相关信息展示")
@Auth(role = RoleEnum.CUSTOMER)
public class AppChannelController extends BaseController {
......@@ -32,7 +30,7 @@ public class AppChannelController extends BaseController {
ChannelService channelService;
@ApiImplicitParams({@ApiImplicitParam(name = "linkedGroupId", value = "关联项目组id", paramType = "query", dataType = "string"),
@ApiImplicitParams({@ApiImplicitParam(name = "contendId", value = "品牌id", paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "platform", value = "平台", paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "keyword", value = "关键字搜索", paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "startTime", value = "起始时间", paramType = "query", dataType = "long"),
......@@ -41,14 +39,92 @@ public class AppChannelController extends BaseController {
})
@ApiOperation("渠道库-活跃渠道榜")
@GetMapping("/list/active")
public ResponseResult getActiveChannelList(@RequestParam(value = "linkedGroupId", required = false) String linkedGroupId,
public ResponseResult getActiveChannelList(@RequestParam(value = "contendId", required = false, defaultValue = "0") String contendId,
@RequestParam(value = "platform", required = false) String platform,
@RequestParam(value = "keyword", required = false) String keyword,
@RequestParam(value = "startTime", required = false) Long startTime,
@RequestParam(value = "endTime", required = false) Long endTime,
@RequestParam(value = "size", defaultValue = "50") int size) {
return ResponseResult.success(channelService.getActiveChannelList(linkedGroupId, platform, keyword, startTime, endTime, size));
return ResponseResult.success(channelService.getActiveChannelList(contendId, platform, keyword, startTime, endTime, size));
}
@ApiOperation("渠道库-友好渠道榜")
@ApiImplicitParams({
@ApiImplicitParam(name = "contendId", value = "品牌ID", paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "platform", value = "平台筛选", paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "keyword", value = "关键字搜索", paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "sorter", defaultValue = "{\"index\":\"descend\"}", value = "排序字段", paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "size", value = "选取前几", defaultValue = "50", paramType = "query", dataType = "int")
})
@GetMapping("/list/positive")
public ResponseResult getPositiveList(@RequestParam(value = "contendId", required = false) String contendId,
@RequestParam(value = "platform", required = false) String platform,
@RequestParam(value = "keyword", required = false) String keyword,
@RequestParam(value = "sorter", required = false) String sorter,
@RequestParam(value = "size", defaultValue = "50") int size) {
return ResponseResult.success(channelService.getPositiveList(contendId, platform, keyword, sorter, size));
}
@ApiOperation("渠道库-敏感渠道榜")
@ApiImplicitParams({
@ApiImplicitParam(name = "contendId", value = "品牌ID", paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "platform", value = "平台筛选", paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "keyword", value = "关键字搜索", paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "sorter", defaultValue = "{\"index\":\"descend\"}", value = "排序字段", paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "size", value = "选取前几", defaultValue = "50", paramType = "query", dataType = "int")
})
@GetMapping("/list/negative")
public ResponseResult getNegativeList(@RequestParam(value = "contendId", required = false) String contendId,
@RequestParam(value = "platform", required = false) String platform,
@RequestParam(value = "keyword", required = false) String keyword,
@RequestParam(value = "sorter", required = false) String sorter,
@RequestParam(value = "size", defaultValue = "50") int size) {
return ResponseResult.success(channelService.getNegativeList(contendId, platform, keyword, sorter, size));
}
@ApiOperation("渠道库-收藏渠道")
@ApiImplicitParam(name = "channelId", value = "渠道ID", required = true, paramType = "path", dataType = "string")
@PostMapping("/collect/{channelId}}")
public ResponseResult collectChannel(@PathVariable String channelId) {
return ResponseResult.success(channelService.collectChannel(channelId));
}
@ApiOperation("渠道库-取消收藏渠道")
@ApiImplicitParam(name = "channelId", value = "渠道ID", required = true, paramType = "path", dataType = "string")
@PostMapping("/removeCollect/{channelId}}")
public ResponseResult removeCollectChannel(@PathVariable String channelId) {
return ResponseResult.success(channelService.removeCollectChannel(channelId));
}
@ApiOperation("渠道库-收藏渠道列表")
@ApiImplicitParams(@ApiImplicitParam(name = "contendId", value = "品牌ID", paramType = "query", dataType = "string"))
@GetMapping("/list/collect}")
public ResponseResult getCollectList(@RequestParam(value = "contendId", required = false, defaultValue = "0") String contendId) {
return ResponseResult.success(channelService.getCollectList(contendId));
}
@ApiOperation("渠道库-渠道基本信息")
@ApiImplicitParam(name = "channelId", value = "渠道ID", required = true, paramType = "path", dataType = "string")
@GetMapping("/baseInfo/{channelId}")
public ResponseResult getBaseInfoByChannelId(@PathVariable("channelId") String channelId) {
return ResponseResult.success(channelService.getBaseInfoByChannelId(channelId));
}
@ApiOperation("渠道库-渠道动向")
@ApiImplicitParams({
@ApiImplicitParam(name = "channelId", value = "渠道ID", required = true, paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "type", value = "类型:文章/事件", required = true, defaultValue = "文章", paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "contends", value = "品牌ID集合", paramType = "query", dataType = "string"),
@ApiImplicitParam(name = "startTime", value = "开始时间", paramType = "query", dataType = "long"),
@ApiImplicitParam(name = "endTime", value = "结束时间", paramType = "query", dataType = "long")
})
@GetMapping("/spreadingTend")
public ResponseResult getSpreadingTend(@RequestParam("channelId") String channelId,
@RequestParam(value = "type", defaultValue = "文章") String type,
@RequestParam(value = "brandIds", required = false) Set<String> contends,
@RequestParam(value = "startTime", required = false) Long startTime,
@RequestParam(value = "endTime", required = false) Long endTime) {
return ResponseResult.success(channelService.getSpreadingTend(channelId, type, contends, startTime, endTime));
}
}
......@@ -11,6 +11,8 @@ import com.zhiwei.brandkbs2.pojo.ChannelIndex;
*/
public interface ChannelDao extends BaseMongoDao<Channel>{
Channel queryUnique(String channelFid);
Channel queryUnique(ChannelIndex channelIndex);
}
......@@ -23,6 +23,12 @@ public class ChannelDaoImpl extends BaseMongoDaoImpl<Channel> implements Channel
}
@Override
public Channel queryUnique(String channelFid) {
Query query = Query.query(Criteria.where("fid").is(channelFid));
return mongoTemplate.findOne(query, clazz, COLLECTION_PREFIX);
}
@Override
public Channel queryUnique(ChannelIndex channelIndex) {
if (null == channelIndex) {
return null;
......@@ -32,8 +38,7 @@ public class ChannelDaoImpl extends BaseMongoDaoImpl<Channel> implements Channel
// and("platform").is(channelIndex.getPlatform()).
// and("realSource").is(channelIndex.getRealSource()).
// and("source").is(channelIndex.getSource()));
Query query = Query.query(Criteria.where("fid").is(channelIndex.getFid()));
return mongoTemplate.findOne(query, clazz, COLLECTION_PREFIX);
return queryUnique(channelIndex.getFid());
}
}
......@@ -17,11 +17,16 @@ public enum ChannelEmotion {
/**
* 中性的
*/
NEUTRAL(2, "不友好渠道"),
NEUTRAL(2, "中性渠道"),
/**
* 负面
* 敏感
*/
NEGATIVE(3, "中性渠道");
NEGATIVE(3, "敏感渠道"),
/**
* 未定义的
*/
UNDEFINED(0,"未定义渠道");
@Getter
private final int state;
......
package com.zhiwei.brandkbs2.es;
import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.pojo.ChannelRecord;
import com.zhiwei.brandkbs2.service.CommonService;
import com.zhiwei.brandkbs2.util.Md5Util;
import org.apache.commons.collections4.ListUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -24,6 +24,7 @@ import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* @ClassName: ChannelEsClientDao
......@@ -47,7 +48,7 @@ public class ChannelEsDao extends EsClientDao {
}
public List<String> getArticleIds(long startTime, long endTime, String fid) {
List<String> articleIds = new ArrayList<>();
List<ChannelIndex.Article> articles = new ArrayList<>();
try {
SearchHelper searchHelper = createSearchHelper();
BoolQueryBuilder postFilter = channelUniqueBool(fid);
......@@ -58,12 +59,13 @@ public class ChannelEsDao extends EsClientDao {
SearchHits searchHits = retryTemplate.execute(context -> searchHits(searchHelper));
for (SearchHit hit : searchHits.getHits()) {
ChannelRecord channelRecord = new ChannelRecord(hit.getSourceAsMap());
channelRecord.getLinkMaps().stream().filter(linkMap -> fid.equals(linkMap.getIndex().getFid())).
forEach(linkMap -> articleIds.addAll(linkMap.getRecord().getArticleIds()));
List<ChannelIndex.Article> collect = channelRecord.getRecord().getArticles();
articles.addAll(collect);
}
} catch (IOException ignored) {
}
return articleIds;
return articles.stream().map(ChannelIndex.Article::getId).collect(Collectors.toList());
}
public void upsertChannelRecord(List<ChannelRecord> channelRecords) {
......@@ -75,8 +77,7 @@ public class ChannelEsDao extends EsClientDao {
for (ChannelRecord record : records) {
startTime = null == startTime ? record.getRangeStartTime() : Math.min(startTime, record.getRangeStartTime());
endTime = null == endTime ? record.getRangeEndTime() : Math.max(endTime, record.getRangeEndTime());
bulkRequest.add(new IndexRequest(index).id(Md5Util.encode2EsId(record.getRangeStartTime(),
record.getRangeEndTime())).source(record.toEsMap()));
bulkRequest.add(new IndexRequest(index).id(record.getEsId()).source(record.toEsMap()));
}
BulkResponse bulkResponse = retryTemplate.execute(context -> {
try {
......@@ -91,7 +92,7 @@ public class ChannelEsDao extends EsClientDao {
log.error("upsertRecord批量操作失败,index:{},rangeTime:{},startTime[es-batchUpsert]异常:{}", index, startTime + "-" + endTime,
bulkResponse.buildFailureMessage());
} else {
log.info("upsertRecord批量操作成功,rangeTime:{},入库:{}条", startTime + "-" + endTime, records.size());
log.info("upsertRecord批量操作成功,rangeTime:{},入库:{}条", DF.format(startTime) + "-" + DF.format(endTime), records.size());
}
}
}
......@@ -101,6 +102,11 @@ public class ChannelEsDao extends EsClientDao {
return new String[]{getChannelRecordIndex()};
}
@Override
protected RestHighLevelClient getEsClient() {
return channelEsClient;
}
public String getChannelRecordIndex() {
// if (test) {
return GenericAttribute.ES_CHANNEL_INDEX_TEST;
......@@ -114,7 +120,7 @@ public class ChannelEsDao extends EsClientDao {
private BoolQueryBuilder channelUniqueBool(String fid) {
BoolQueryBuilder postFilter = QueryBuilders.boolQuery();
return postFilter.must(QueryBuilders.termQuery(ChannelRecord.LINK_MAPS + ".fid.keyword", fid));
return postFilter.must(QueryBuilders.termQuery(ChannelRecord.KEY + ".keyword", fid));
}
}
......@@ -50,7 +50,7 @@ import java.util.stream.Collectors;
@Component("esClientDao")
public class EsClientDao {
private static final Logger log = LogManager.getLogger(EsClientDao.class);
private static final FastDateFormat DF = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
protected static final FastDateFormat DF = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
private static final String[] CHANNEL_RECORD_FETCH_SOURCE = new String[]{"id", "c5", "foreign", "real_source", "source", "mtime", "time", "brandkbs_cache_maps"};
private static final String[] EVENT_FETCH_SOURCE = new String[]{"ind_full_text", "c5", "real_source", "source", "mtime", "time", "url", "mtag"};
......@@ -74,7 +74,7 @@ public class EsClientDao {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
QueryBuilder queryBuilder = QueryBuilders.idsQuery().addIds(queryIds.toArray(new String[0]));
sourceBuilder.query(queryBuilder).size(queryIds.size());
SearchResponse searchResponse = esClient.search(new SearchRequest(getIndexes()).source(sourceBuilder), RequestOptions.DEFAULT);
SearchResponse searchResponse = getEsClient().search(new SearchRequest(getIndexes()).source(sourceBuilder), RequestOptions.DEFAULT);
return Arrays.stream(searchResponse.getHits().getHits()).collect(Collectors.toMap(SearchHit::getId, hit -> new JSONObject(hit.getSourceAsMap())));
}
......@@ -137,18 +137,18 @@ public class EsClientDao {
*/
private List<Map<String, Object>> searchScroll(SearchSourceBuilder searchSourceBuilder) throws IOException {
List<Map<String, Object>> res = new ArrayList<>();
SearchResponse searchResponse = esClient.search(new SearchRequest(getIndexes()).source(searchSourceBuilder).scroll(TIME_VALUE), RequestOptions.DEFAULT);
SearchResponse searchResponse = getEsClient().search(new SearchRequest(getIndexes()).source(searchSourceBuilder).scroll(TIME_VALUE), RequestOptions.DEFAULT);
while (true) {
if (0 == searchResponse.getHits().getHits().length) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(searchResponse.getScrollId());
esClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
getEsClient().clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
break;
}
res.addAll(Arrays.stream(searchResponse.getHits().getHits()).map(SearchHit::getSourceAsMap).collect(Collectors.toList()));
SearchScrollRequest scrollRequest = new SearchScrollRequest(searchResponse.getScrollId());
scrollRequest.scroll(TIME_VALUE);
searchResponse = esClient.scroll(scrollRequest, RequestOptions.DEFAULT);
searchResponse = getEsClient().scroll(scrollRequest, RequestOptions.DEFAULT);
}
return res;
}
......@@ -198,6 +198,10 @@ public class EsClientDao {
return getIndexList().toArray(new String[0]);
}
protected RestHighLevelClient getEsClient(){
return esClient;
}
private List<String> getIndexList() {
List<String> res = new ArrayList<>();
if (test) {
......@@ -258,7 +262,7 @@ public class EsClientDao {
}
countRequest.indices(indexes);
countRequest.query(countQuery);
CountResponse response = retryTemplate.execute(context -> esClient.count(countRequest, RequestOptions.DEFAULT));
CountResponse response = retryTemplate.execute(context -> getEsClient().count(countRequest, RequestOptions.DEFAULT));
return response.getCount();
}
......@@ -271,7 +275,7 @@ public class EsClientDao {
}
private SearchResponse searchResponse(SearchRequest searchRequest) throws IOException {
return retryTemplate.execute(context -> esClient.search(searchRequest, RequestOptions.DEFAULT));
return retryTemplate.execute(context -> getEsClient().search(searchRequest, RequestOptions.DEFAULT));
}
private SearchRequest searchRequest(SearchHelper searchHelper) {
......@@ -281,7 +285,7 @@ public class EsClientDao {
private SearchRequest searchRequest(String[] indexes, QueryBuilder postFilter, QueryBuilder query, AggregationBuilder aggregationBuilder,
String[] fetchSource,
FieldSortBuilder sort, int from, int size, HighlightBuilder highlighter) {
FieldSortBuilder sort, Integer from, Integer size, HighlightBuilder highlighter) {
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchRequest.indices(indexes);
......@@ -300,10 +304,10 @@ public class EsClientDao {
if (!Objects.isNull(sort)) {
searchSourceBuilder.sort(sort);
}
if (from >= 0) {
if (null != from && from >= 0) {
searchSourceBuilder.from(from);
}
if (size == 0) {
if (null == size) {
searchSourceBuilder.size(10000);
} else if (size > 0) {
searchSourceBuilder.size(size);
......@@ -318,18 +322,18 @@ public class EsClientDao {
private List<SearchResponse> scrollSearch(SearchRequest searchRequest) throws IOException {
List<SearchResponse> res = new ArrayList<>();
searchRequest.scroll(TIME_VALUE);
SearchResponse searchResponse = esClient.search(searchRequest, RequestOptions.DEFAULT);
SearchResponse searchResponse = getEsClient().search(searchRequest, RequestOptions.DEFAULT);
String scrollId = searchResponse.getScrollId();
res.add(searchResponse);
while (Objects.nonNull(scrollId)) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(TIME_VALUE);
searchResponse = esClient.scroll(scrollRequest, RequestOptions.DEFAULT);
searchResponse = getEsClient().scroll(scrollRequest, RequestOptions.DEFAULT);
scrollId = searchResponse.getScrollId();
if (1 > searchResponse.getHits().getHits().length) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
esClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
getEsClient().clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
scrollId = null;
} else {
res.add(searchResponse);
......@@ -340,7 +344,7 @@ public class EsClientDao {
private void defaultInit(SearchHelper searchHelper) {
if (null == searchHelper.getIndexes()) {
searchHelper.setIndexes(EsClientDao.this.getIndexes());
searchHelper.setIndexes(getIndexes());
}
}
......@@ -349,13 +353,13 @@ public class EsClientDao {
@Getter
public static class SearchHelper {
String[] indexes;
QueryBuilder postFilter;
QueryBuilder query;
BoolQueryBuilder postFilter;
BoolQueryBuilder query;
AggregationBuilder aggregationBuilder;
String[] fetchSource;
FieldSortBuilder sort;
int from;
int size;
Integer from;
Integer size;
HighlightBuilder highlighter;
}
......
package com.zhiwei.brandkbs2.es;
import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.pushlog.tools.Tools;
import com.zhiwei.qbjc.bean.pojo.common.MessagePlatform;
import org.apache.commons.lang3.StringUtils;
......@@ -102,12 +103,37 @@ public class EsQueryTools {
for (String key : keys) {
String channelRegex = getAllRegex(key);
BoolQueryBuilder keyQueryBuilder = QueryBuilders.boolQuery();
keyQueryBuilder.must(QueryBuilders.regexpQuery("source.keyword",".*"+channelRegex+".*"));
keyQueryBuilder.must(QueryBuilders.regexpQuery("source.keyword", ".*" + channelRegex + ".*"));
channelBoolQueryBuilder.should(keyQueryBuilder);
}
return channelBoolQueryBuilder;
}
public static void platformMustNot(BoolQueryBuilder boolQueryBuilder, String... platformNames) {
if (null == platformNames) {
return;
}
for (String platformName : platformNames) {
String platformIdByName = GlobalPojo.getPlatformIdByName(platformName);
if (null != platformIdByName) {
boolQueryBuilder.mustNot(QueryBuilders.termQuery(GenericAttribute.ES_PLATFORM_ID, platformIdByName));
}
}
}
public static void assembleContendsQuery(BoolQueryBuilder query, Collection<String> contends) {
BoolQueryBuilder contendQuery = QueryBuilders.boolQuery();
// 主品牌一定参与
contendQuery.should(QueryBuilders.termQuery("contend_id", "0"));
if (null == contends) {
return;
}
for (String contendId : contends) {
contendQuery.should((QueryBuilders.termQuery("contend_id", contendId)));
}
query.must(contendQuery);
}
/**
* platform 组合语句
*
......
......@@ -62,6 +62,16 @@ public class Channel extends ChannelIndex {
*/
private boolean show = true;
/**
* 是否被收藏
*/
private Boolean isCollect;
/**
* 收藏时间
*/
private Long collectTime;
public void setLastTime(Long lastTime) {
if (null == this.lastTime || this.lastTime < lastTime) {
this.lastTime = lastTime;
......@@ -78,7 +88,7 @@ public class Channel extends ChannelIndex {
public void setRecord(Record record) {
setLastTime(record.getLastTime());
increaseArticleCount(record.getArticleIds().size());
increaseArticleCount(record.getArticles().size());
}
public ChannelIndex getChannelIndex() {
......
......@@ -5,12 +5,10 @@ import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.util.Tools;
import com.zhiwei.qbjc.bean.pojo.common.MessagePlatform;
import com.zhiwei.qbjc.bean.tools.BeanTools;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.*;
import java.util.*;
import java.util.stream.Collectors;
/**
* @ClassName: ChannelIndex
......@@ -24,9 +22,17 @@ import java.util.*;
@NoArgsConstructor
public class ChannelIndex extends AbstractBaseMongo {
/**
* 项目id
*/
private String projectId;
/**
* 竞品id
*/
private String contendId;
/**
* 关联项目组ID
*/
private String linkedGroupId;
......@@ -54,7 +60,7 @@ public class ChannelIndex extends AbstractBaseMongo {
this.platform = platform;
this.realSource = realSource;
this.source = source;
this.fid = Tools.concat(projectId, linkedGroupId, platform, realSource, source);
this.fid = Tools.getFid(projectId, linkedGroupId, platform, realSource, source);
}
public static ChannelIndex createChannelIndex(Map<String, Object> sourceAsMap, String projectId, String linkedGroupId) {
......@@ -81,9 +87,15 @@ public class ChannelIndex extends AbstractBaseMongo {
}
List<Map<String, Object>> cacheMaps = (List<Map<String, Object>>) sourceAsMap.get(GenericAttribute.ES_BRANDKBS_CACHE_MAPS);
for (Map<String, Object> cacheMap : cacheMaps) {
String projectId = String.valueOf(cacheMap.get(GenericAttribute.PROJECT_ID));
String linkedGroupId = String.valueOf(cacheMap.get(GenericAttribute.LINKED_GROUP_ID));
res.add(new ChannelIndex(projectId, linkedGroupId, messagePlatform.getName(), realSource, source));
String projectId = String.valueOf(cacheMap.get("project_id"));
String linkedGroupId = String.valueOf(cacheMap.get("linked_group_id"));
ChannelIndex channelIndex = new ChannelIndex(projectId, linkedGroupId, messagePlatform.getName(), realSource, source);
// 默认主品牌
channelIndex.setContendId(String.valueOf(0));
Optional.ofNullable(cacheMap.get("contend_id")).ifPresent(e -> {
channelIndex.setContendId(String.valueOf(e));
});
res.add(channelIndex);
}
return res;
}
......@@ -111,17 +123,18 @@ public class ChannelIndex extends AbstractBaseMongo {
@Setter
@Getter
@AllArgsConstructor
public static class Record {
private Long lastTime;
private List<String> articleIds = new ArrayList<>();
private List<Article> articles = new ArrayList<>();
public Record() {
}
public Record(Long lastTime, String articleId) {
this.lastTime = lastTime;
this.articleIds.add(articleId);
this.articles.add(new Article(lastTime, articleId));
}
public void setLastTime(Long lastTime) {
......@@ -131,11 +144,36 @@ public class ChannelIndex extends AbstractBaseMongo {
}
public Record mergeRecord(Record record) {
this.articleIds.addAll(record.getArticleIds());
this.articles.addAll(record.getArticles());
setLastTime(record.getLastTime());
return this;
}
public Map<String, Object> toEsMap() {
Map<String, Object> map = new HashMap<>();
map.put("last_time", lastTime);
map.put("articles", articles.stream().map(Article::toEsMap).collect(Collectors.toList()));
return map;
}
}
@Data
@AllArgsConstructor
public static class Article {
Long time;
String id;
public static Article fromEsMap(Map<String, Object> esMap) {
return new Article(Long.valueOf(esMap.get("time") + ""), String.valueOf(esMap.get("id")));
}
public Map<String, Object> toEsMap() {
Map<String, Object> map = new HashMap<>();
map.put("time", time);
map.put("id", id);
return map;
}
}
@Override
......@@ -144,13 +182,12 @@ public class ChannelIndex extends AbstractBaseMongo {
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
ChannelIndex that = (ChannelIndex) o;
return Objects.equals(projectId, that.projectId) && Objects.equals(linkedGroupId, that.linkedGroupId) && Objects.equals(platform, that.platform) && Objects.equals(realSource,
that.realSource) && Objects.equals(source, that.source);
return Objects.equals(fid, that.fid) && Objects.equals(contendId, that.contendId);
}
@Override
public int hashCode() {
return Objects.hash(projectId, linkedGroupId, platform, realSource, source);
return Objects.hash(fid, contendId);
}
}
package com.zhiwei.brandkbs2.pojo.vo;
import com.zhiwei.brandkbs2.enmus.ChannelEmotion;
import com.zhiwei.brandkbs2.pojo.Channel;
import com.zhiwei.brandkbs2.util.Tools;
import lombok.Data;
import lombok.ToString;
......@@ -26,19 +29,39 @@ public class ChannelListVO {
*/
private String realSource;
/**
* 友好指数
* 情感指数
*/
private Double number;
private Double emotionIndex;
/**
* 发文数
*/
private Integer articleCount;
private long articleCount;
/**
* 情感倾向
*/
private String emotion;
private String showEmotion = ChannelEmotion.UNDEFINED.getName();
/**
* logo图片
* 头像图片
*/
private String imgUrl;
private String avatarUrl;
public static ChannelListVO createFromChannel(String channelFid, long articleCount) {
ChannelListVO channelListVO = new ChannelListVO();
// projectId, linkedGroupId, platform, realSource, source
String[] split = Tools.split(channelFid);
channelListVO.setSource(split[4]);
channelListVO.setPlatform(split[2]);
channelListVO.setRealSource(split[3]);
channelListVO.setEmotionIndex(0D);
channelListVO.setArticleCount(articleCount);
return channelListVO;
}
public static ChannelListVO createFromChannel(Channel channel, long articleCount) {
ChannelListVO channelListVO = Tools.convertMap(channel, ChannelListVO.class);
String name = ChannelEmotion.getNameFromState(channel.getEmotion());
channelListVO.setShowEmotion(name);
channelListVO.setArticleCount(articleCount);
return channelListVO;
}
}
......@@ -7,10 +7,12 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.ToString;
import org.bson.types.ObjectId;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author sjj
......@@ -140,7 +142,12 @@ public class ProjectVO {
project.setWholeSearchDataSource(this.getWholeSearchDataSource());
project.setImportTime(this.getImportTime().getTime());
project.setHasContend(null != this.getContendList());
project.setContendList(this.getContendList());
project.setContendList(this.getContendList().stream().peek(contend -> {
// 新生成id
if (null == contend.getId()) {
contend.setId(ObjectId.get().toString());
}
}).collect(Collectors.toList()));
project.setModuleShowList(this.getModuleShowList());
project.setChannelFileUrl(this.getChannelFileUrl());
project.setNegativeChannelParams((this.getNegativeChannelParams()));
......
......@@ -9,6 +9,7 @@ import com.zhiwei.brandkbs2.pojo.vo.ChannelListVO;
import com.zhiwei.brandkbs2.pojo.vo.PageVO;
import java.util.List;
import java.util.Set;
/**
* @ClassName: ChannelService
......@@ -115,7 +116,71 @@ public interface ChannelService {
/**
* 获取活跃渠道榜
*
* @return ChannelListVO
*/
List<ChannelListVO> getActiveChannelList(String linkedGroupId, String platform, String keyword, Long startTime, Long endTime, int size);
List<ChannelListVO> getActiveChannelList(String contendId, String platform, String keyword, Long startTime, Long endTime, int size);
/**
* 获取友好渠道榜
*
* @param contendId
* @param platform
* @param keyword
* @param sorter
* @param size
* @return
*/
List<ChannelListVO> getPositiveList(String contendId, String platform, String keyword, String sorter, int size);
/**
* 获取敏感渠道榜
*
* @param contendId
* @param platform
* @param keyword
* @param sorter
* @param size
* @return
*/
List<ChannelListVO> getNegativeList(String contendId, String platform, String keyword, String sorter, int size);
/**
* 收藏渠道
*
* @param channelId
* @return
*/
boolean collectChannel(String channelId);
/**
* 取消收藏渠道
*
* @param channelId
* @return
*/
boolean removeCollectChannel(String channelId);
List<JSONObject> getCollectList(String contendId);
/**
* 获取渠道基本信息
*
* @param channelId
* @return
*/
JSONObject getBaseInfoByChannelId(String channelId);
/**
* 获取渠道动向
*
* @param channelId 渠道ID
* @param type 传播类型
* @param contends 品牌ID集合
* @param startTime 开始时间
* @param endTime 结束时间
* @return 渠道动向
*/
JSONObject getSpreadingTend(String channelId, String type, Set<String> contends, Long startTime, Long endTime);
}
......@@ -99,6 +99,14 @@ public interface ProjectService {
AbstractProject getProjectByLinkedGroupId(String linkedGroupId);
/**
* 根据品牌ID获取Project
*
* @param contendId 品牌id
* @return project对象
*/
AbstractProject getProjectByContendId(String contendId);
/**
* 获取所有启动状态下的Project
* @return
*/
......
......@@ -475,7 +475,8 @@ public class EventServiceImpl implements EventService {
List<JSONObject> resultList = partition.get(page - 1);
resultList = resultList.stream().peek(json -> {
String name = json.getString("name");
json.put("eventCount", eventDao.count(Query.query(Criteria.where("eventTag." + tagGroupName).is(name))));
Query query = Query.query(Criteria.where("eventTag." + tagGroupName).is(name).and("projectId").is(UserThreadLocal.getProjectId()).and("linkedGroupId").is(linkedGroupId));
json.put("eventCount", eventDao.count(query));
}).collect(Collectors.toList());
return PageVO.createPageVo(tagGroupList.size(), page, partition.size(), size, resultList);
}
......
......@@ -30,7 +30,6 @@ import com.zhiwei.brandkbs2.util.RedisUtil;
import com.zhiwei.brandkbs2.util.TextUtil;
import com.zhiwei.brandkbs2.util.Tools;
import com.zhiwei.qbjc.bean.pojo.common.MessagePlatform;
import io.lettuce.core.ScriptOutputType;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
......@@ -53,7 +52,6 @@ import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogra
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
......@@ -1017,7 +1015,7 @@ public class MarkDataServiceImpl implements MarkDataService {
return textList;
}
protected static BoolQueryBuilder projectLinkedGroupQuery(String projectId, String linkedGroupId) {
private static BoolQueryBuilder projectLinkedGroupQuery(String projectId, String linkedGroupId) {
return QueryBuilders.boolQuery().must(QueryBuilders.termQuery("brandkbs_cache_maps.project_id.keyword", projectId)).must(QueryBuilders.termQuery("brandkbs_cache_maps.linked_group_id.keyword", linkedGroupId));
}
......
......@@ -184,6 +184,7 @@ public class ProjectServiceImpl implements ProjectService {
List<JSONObject> resultList = new ArrayList<>();
if (hasPrimary) {
JSONObject json = new JSONObject();
json.put("contendId", "0");
json.put("brandName", project.getBrandName());
json.put("linkedGroupId", project.getBrandLinkedGroupId());
resultList.add(json);
......@@ -192,6 +193,7 @@ public class ProjectServiceImpl implements ProjectService {
if (CollectionUtils.isNotEmpty(contendList)) {
for (Contend contend : contendList) {
JSONObject json = new JSONObject();
json.put("contendId", contend.getId());
json.put("brandName", contend.getBrandName());
json.put("linkedGroupId", contend.getBrandLinkedGroupId());
resultList.add(json);
......@@ -220,6 +222,25 @@ public class ProjectServiceImpl implements ProjectService {
}
@Override
public AbstractProject getProjectByContendId(String contendId) {
String projectId = UserThreadLocal.getProjectId();
Project project = projectDao.findOneById(projectId);
if ("0".equals(contendId)) {
return project;
}
List<Contend> contendList = project.getContendList();
if (CollectionUtils.isEmpty(contendList)) {
return null;
}
for (Contend contend : contendList) {
if (contend.getId().equals(contendId)) {
return contend;
}
}
return null;
}
@Override
public List<Project> getAllProjectsWithStart() {
return projectDao.findList(Query.query(Criteria.where("isStart").is(true)));
}
......
......@@ -11,6 +11,8 @@ import com.zhiwei.brandkbs2.service.BrandkbsTaskService;
import com.zhiwei.brandkbs2.service.ProjectService;
import com.zhiwei.brandkbs2.service.ReportService;
import com.zhiwei.brandkbs2.service.TaskService;
import com.zhiwei.brandkbs2.util.Tools;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -70,29 +72,33 @@ public class TaskServiceImpl implements TaskService {
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day);
int total = rangeTimeRecords.stream().mapToInt(pair -> pair.getRight().values().size()).sum();
log.info("渠道统计-搜索到近{}天的受影响渠道数{}条", day, total);
List<ChannelRecord> channelRecords = rangeTimeRecords.stream().map(pair -> new ChannelRecord(pair.getLeft()[0], pair.getLeft()[1], pair.getRight())).collect(Collectors.toList());
channelEsDao.upsertChannelRecord(channelRecords);
log.info("渠道统计-小时级渠道记录统计结束");
List<Channel> insertList = new ArrayList<>();
// 结果合并
List<Map<ChannelIndex, ChannelIndex.Record>> channelList = rangeTimeRecords.stream().map(Pair::getRight).collect(Collectors.toList());
// 合并渠道记录
Map<ChannelIndex, ChannelIndex.Record> channelIndexRecordMap = ChannelIndex.mergeRecord(channelList);
// 获得单位时间内最小最大时间戳
Long[] timeMinMax = Tools.timeMinMax(rangeTimeRecords.stream().map(Pair::getLeft).collect(Collectors.toList()));
List<ChannelRecord> channelRecords = ChannelRecord.createChannelRecords(timeMinMax[0], timeMinMax[1], channelIndexRecordMap);
// List<ChannelRecord> channelRecords = rangeTimeRecords.stream().map(pair -> ChannelRecord.createChannelRecords(pair.getLeft()[0], pair.getLeft()[1], pair.getRight())).flatMap(Collection::stream).collect(Collectors.toList());
channelEsDao.upsertChannelRecord(channelRecords);
log.info("渠道统计-小时级渠道记录-统计结束");
List<Channel> insertList = new ArrayList<>();
for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : channelIndexRecordMap.entrySet()) {
// 是否已存在
Channel channel = channelDao.queryUnique(entry.getKey());
if (null == channel) {
channel = Channel.createFromChannelIndexRecord(entry.getKey(), entry.getValue());
insertList.add(channel);
channelDao.insertOne(channel);
} else {
channel.setRecord(entry.getValue());
channelDao.updateOne(channel);
}
}
// ListUtils.partition(insertList, 1000).forEach(list -> {
// channelDao.insertMany(list);
// });
log.info("渠道统计-录入完毕,新增渠道{}条,更新渠道{}条", insertList.size(), total - insertList.size());
log.info("渠道统计-渠道总计-查询并更新结束,开始批量入库");
ListUtils.partition(insertList, 1000).forEach(list -> {
channelDao.insertMany(list);
});
log.info("渠道统计-渠道总计-录入完毕,新增渠道{}条,更新渠道{}条", insertList.size(), total - insertList.size());
}
@Override
......
......@@ -27,13 +27,13 @@ public class ControlCenter {
@Async("scheduledExecutor")
@Scheduled(cron = "0 0 1 * * ?")
public void messageFlowCount() {
log.info("定时按天录入各小时渠道进量-启动");
log.info("定时按天录入渠道进量-启动");
try {
taskService.messageFlowCount(1);
} catch (Exception e) {
log.error("定时按天录入各小时渠道进量-出错", e);
log.error("定时按天录入渠道进量-出错", e);
} finally {
log.info("定时按天录入各小时渠道进量-结束");
log.info("定时按天录入渠道进量-结束");
}
}
......
......@@ -41,8 +41,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static com.zhiwei.brandkbs2.common.GenericAttribute.ES_MARK_CACHE_MAPS;
import static com.zhiwei.brandkbs2.config.Constant.DAY_PATTERN;
import static com.zhiwei.brandkbs2.config.Constant.HOUR_PATTERN;
import static com.zhiwei.brandkbs2.config.Constant.*;
import static java.util.Objects.nonNull;
/**
......@@ -483,6 +482,18 @@ public class Tools {
return resultStr.substring(0, resultStr.length() - 1);
}
public static String[] split(String concatStr) {
return concatStr.split(Constant.DEFAULT_SEPARATOR);
}
public static String getFid(String projectId, String linkedGroupId, String platform, String realSource, String source) {
return concat(projectId, linkedGroupId, platform, realSource, source);
}
public static String getFidFromChannelKey(String channelKey) {
return channelKey.substring(0, channelKey.lastIndexOf(DEFAULT_SEPARATOR));
}
public static List<String> readListFile(InputStream inputStream) throws Exception {
BufferedReader br = null;
String line;
......@@ -585,4 +596,38 @@ public class Tools {
return null == token || Objects.equals("undefined", token);
}
/**
* 获得区段时间的最大最小值
*
* @param times
* @return
*/
public static Long[] timeMinMax(List<Long[]> times) {
Long min = null;
Long max = null;
for (Long[] time : times) {
if (null == min) {
min = time[0];
} else {
min = Math.min(min, time[0]);
}
if (null == max) {
max = time[1];
} else {
max = Math.max(max, time[1]);
}
}
return new Long[]{min, max};
}
public static boolean hitTimeRange(Long startTime, Long endTime, Long targetTime) {
if (null == targetTime) {
return false;
}
if (null != startTime && startTime > targetTime) {
return false;
}
return null == endTime || endTime > targetTime;
}
}
\ No newline at end of file
spring.profiles.active=local
spring.profiles.active=pro
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment