Commit 8cc8e277 by 陈健智

Merge remote-tracking branch 'origin/feature' into feature

parents b9117713 af1345e0
...@@ -16,6 +16,7 @@ public class GenericAttribute { ...@@ -16,6 +16,7 @@ public class GenericAttribute {
*/ */
public static final String ES_INDEX_PRE = "brandkbs2"; public static final String ES_INDEX_PRE = "brandkbs2";
public static final String ES_INDEX_TEST = "brandkbs2_test"; public static final String ES_INDEX_TEST = "brandkbs2_test";
public static final String ES_CHANNEL_INDEX_TEST = "brandkbs2_channel_record_test";
/** /**
* es ind_title * es ind_title
**/ **/
......
...@@ -72,7 +72,7 @@ public class AppArticleController extends BaseController { ...@@ -72,7 +72,7 @@ public class AppArticleController extends BaseController {
@ApiOperation("舆情列表-搜索条件") @ApiOperation("舆情列表-搜索条件")
@GetMapping("/mark/list/criteria") @GetMapping("/mark/list/criteria")
public ResponseResult getYuqingMark(@RequestParam(required = false) String linkedGroupId) { public ResponseResult getYuqingMarkCriteria(@RequestParam(required = false) String linkedGroupId) {
return ResponseResult.success(markDataService.getYuqingMarkCriteria(linkedGroupId)); return ResponseResult.success(markDataService.getYuqingMarkCriteria(linkedGroupId));
} }
......
package com.zhiwei.brandkbs2.dao; package com.zhiwei.brandkbs2.dao;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.pojo.Event; import com.zhiwei.brandkbs2.pojo.Event;
import java.util.List;
/** /**
* @ClassName: EventDao * @ClassName: EventDao
* @Description EventDao * @Description EventDao
...@@ -30,4 +33,13 @@ public interface EventDao extends BaseMongoDao<Event>{ ...@@ -30,4 +33,13 @@ public interface EventDao extends BaseMongoDao<Event>{
*/ */
Event getEventByUniqueIds(String yqEventId,String projectId,String linkedGroupId); Event getEventByUniqueIds(String yqEventId,String projectId,String linkedGroupId);
/**
* 获取参与事件数
*
* @param channelIndex 渠道标识
* @return 参与事件数
*/
List<String> getEventCount(ChannelIndex channelIndex);
} }
...@@ -23,14 +23,6 @@ public interface EventDataDao extends BaseMongoDao<EventData>, ShardingMongo { ...@@ -23,14 +23,6 @@ public interface EventDataDao extends BaseMongoDao<EventData>, ShardingMongo {
EventData findFirstData(String eventId, String collectionName); EventData findFirstData(String eventId, String collectionName);
/** /**
* 获取参与事件数
*
* @param channelIndex 渠道标识
* @return 参与事件数
*/
List<String> getEventCount(ChannelIndex channelIndex);
/**
* 获取传播量 * 获取传播量
* *
* @param event 事件 * @param event 事件
......
package com.zhiwei.brandkbs2.dao.impl; package com.zhiwei.brandkbs2.dao.impl;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.dao.EventDao; import com.zhiwei.brandkbs2.dao.EventDao;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.pojo.Event; import com.zhiwei.brandkbs2.pojo.Event;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Calendar;
import java.util.List;
import java.util.stream.Collectors;
import static com.zhiwei.brandkbs2.dao.impl.EventDataDaoImpl.COLLECTION_PREFIX;
/** /**
* @ClassName: EventDaoImpl * @ClassName: EventDaoImpl
* @Description 事件业务实现类 * @Description 事件业务实现类
...@@ -32,4 +42,22 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao { ...@@ -32,4 +42,22 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
query.addCriteria(Criteria.where("yqEventId").is(yqEventId).and("projectId").is(projectId).and("linkedGroupId").is(linkedGroupId)); query.addCriteria(Criteria.where("yqEventId").is(yqEventId).and("projectId").is(projectId).and("linkedGroupId").is(linkedGroupId));
return mongoTemplate.findOne(query, Event.class); return mongoTemplate.findOne(query, Event.class);
} }
@Override
public List<String> getEventCount(ChannelIndex channelIndex) {
// 添加渠道唯一标识
Criteria criteria = addChannelIndex(channelIndex);
// 分组
Aggregation agg = Aggregation.newAggregation(Aggregation.match(criteria), Aggregation.group("eventId").count().as("eventCount"));
AggregationResults<JSONObject> aggregate = mongoTemplate.aggregate(agg, getAggreeCollection(), JSONObject.class);
List<JSONObject> mappedResults = aggregate.getMappedResults();
return mappedResults.stream().map(json -> json.getString("_id")).collect(Collectors.toList());
}
private String getAggreeCollection() {
Calendar date = Calendar.getInstance();
int year = date.get(Calendar.YEAR);
return COLLECTION_PREFIX + "_" + year;
}
} }
package com.zhiwei.brandkbs2.dao.impl; package com.zhiwei.brandkbs2.dao.impl;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.dao.EventDataDao; import com.zhiwei.brandkbs2.dao.EventDataDao;
import com.zhiwei.brandkbs2.pojo.ChannelIndex; import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.pojo.Event; import com.zhiwei.brandkbs2.pojo.Event;
import com.zhiwei.brandkbs2.pojo.EventData; import com.zhiwei.brandkbs2.pojo.EventData;
import org.springframework.data.domain.Sort; import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.stream.Collectors;
/** /**
* @ClassName: EventDataDaoImpl * @ClassName: EventDataDaoImpl
...@@ -26,7 +21,7 @@ import java.util.stream.Collectors; ...@@ -26,7 +21,7 @@ import java.util.stream.Collectors;
@Component("eventDataDao") @Component("eventDataDao")
public class EventDataDaoImpl extends BaseMongoDaoImpl<EventData> implements EventDataDao { public class EventDataDaoImpl extends BaseMongoDaoImpl<EventData> implements EventDataDao {
private static final String COLLECTION_PREFIX = "brandkbs_event_data"; protected static final String COLLECTION_PREFIX = "brandkbs_event_data";
private static final String TIME_PATTERN = "yyyy"; private static final String TIME_PATTERN = "yyyy";
public EventDataDaoImpl() { public EventDataDaoImpl() {
...@@ -53,17 +48,6 @@ public class EventDataDaoImpl extends BaseMongoDaoImpl<EventData> implements Eve ...@@ -53,17 +48,6 @@ public class EventDataDaoImpl extends BaseMongoDaoImpl<EventData> implements Eve
} }
@Override @Override
public List<String> getEventCount(ChannelIndex channelIndex) {
// 添加渠道唯一标识
Criteria criteria = addChannelIndex(channelIndex);
// 分组
Aggregation agg = Aggregation.newAggregation(Aggregation.match(criteria), Aggregation.group("eventId").count().as("eventCount"));
AggregationResults<JSONObject> aggregate = mongoTemplate.aggregate(agg, getAggreeCollection(), JSONObject.class);
List<JSONObject> mappedResults = aggregate.getMappedResults();
return mappedResults.stream().map(json -> json.getString("_id")).collect(Collectors.toList());
}
@Override
public long getEventArticleCount(Event event) { public long getEventArticleCount(Event event) {
return count(Query.query(Criteria.where("eventId").is(event.getId())), event.getCollectionName()); return count(Query.query(Criteria.where("eventId").is(event.getId())), event.getCollectionName());
} }
...@@ -91,10 +75,4 @@ public class EventDataDaoImpl extends BaseMongoDaoImpl<EventData> implements Eve ...@@ -91,10 +75,4 @@ public class EventDataDaoImpl extends BaseMongoDaoImpl<EventData> implements Eve
return mongoTemplate.count(query, EventData.class, collectionName); return mongoTemplate.count(query, EventData.class, collectionName);
} }
private String getAggreeCollection() {
Calendar date = Calendar.getInstance();
int year = date.get(Calendar.YEAR);
return COLLECTION_PREFIX + "_" + year;
}
} }
...@@ -112,4 +112,13 @@ public enum EmotionEnum { ...@@ -112,4 +112,13 @@ public enum EmotionEnum {
return emotion; return emotion;
} }
public static int parseFromName(String name) {
for (EmotionEnum value : EmotionEnum.values()) {
if (value.getName().equals(name)) {
return value.getState();
}
}
return 0;
}
} }
package com.zhiwei.brandkbs2.es;
import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.pojo.ChannelRecord;
import com.zhiwei.brandkbs2.service.CommonService;
import com.zhiwei.brandkbs2.util.Md5Util;
import org.apache.commons.collections4.ListUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @ClassName: ChannelEsClientDao
* @Description ChannelEsClientDao
* @author: sjj
* @date: 2022-07-26 17:25
*/
@Component("channelEsDao")
public class ChannelEsDao extends EsClientDao {
private static final Logger log = LogManager.getLogger(ChannelEsDao.class);
@Resource(name = "localEsClient")
RestHighLevelClient channelEsClient;
@Resource(name = "commonServiceImpl")
CommonService commonService;
public List<String> getArticleIds(String fid) {
Long[] timeRangeMonth = commonService.getTimeRangeMonth();
return getArticleIds(timeRangeMonth[0], timeRangeMonth[1], fid);
}
public List<String> getArticleIds(long startTime, long endTime, String fid) {
List<String> articleIds = new ArrayList<>();
try {
SearchHelper searchHelper = createSearchHelper();
BoolQueryBuilder postFilter = channelUniqueBool(fid);
postFilter.must(QueryBuilders.rangeQuery(ChannelRecord.RANGE_START_TIME).gte(startTime));
postFilter.must(QueryBuilders.rangeQuery(ChannelRecord.RANGE_END_TIME).lt(endTime));
searchHelper.setPostFilter(postFilter);
searchHelper.setSort(SortBuilders.fieldSort(ChannelRecord.RANGE_END_TIME).order(SortOrder.DESC));
SearchHits searchHits = retryTemplate.execute(context -> searchHits(searchHelper));
for (SearchHit hit : searchHits.getHits()) {
ChannelRecord channelRecord = new ChannelRecord(hit.getSourceAsMap());
channelRecord.getLinkMaps().stream().filter(linkMap -> fid.equals(linkMap.getIndex().getFid())).
forEach(linkMap -> articleIds.addAll(linkMap.getRecord().getArticleIds()));
}
} catch (IOException ignored) {
}
return articleIds;
}
public void upsertChannelRecord(List<ChannelRecord> channelRecords) {
String index = getChannelRecordIndex();
BulkRequest bulkRequest = new BulkRequest();
Long startTime = null;
Long endTime = null;
for (List<ChannelRecord> records : ListUtils.partition(channelRecords, 1000)) {
for (ChannelRecord record : records) {
startTime = null == startTime ? record.getRangeStartTime() : Math.min(startTime, record.getRangeStartTime());
endTime = null == endTime ? record.getRangeEndTime() : Math.max(endTime, record.getRangeEndTime());
bulkRequest.add(new IndexRequest(index).id(Md5Util.encode2EsId(record.getRangeStartTime(),
record.getRangeEndTime())).source(record.toEsMap()));
}
BulkResponse bulkResponse = retryTemplate.execute(context -> {
try {
return channelEsClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException ignored) {
return null;
}
});
if (null == bulkResponse) {
log.error("upsertRecord批量操作重试后失败,index:{},rangeTime:{}", index, startTime + "-" + endTime);
} else if (bulkResponse.hasFailures()) {
log.error("upsertRecord批量操作失败,index:{},rangeTime:{},startTime[es-batchUpsert]异常:{}", index, startTime + "-" + endTime,
bulkResponse.buildFailureMessage());
} else {
log.info("upsertRecord批量操作成功,rangeTime:{},入库:{}条", startTime + "-" + endTime, records.size());
}
}
}
@Override
public String[] getIndexes() {
return new String[]{getChannelRecordIndex()};
}
public String getChannelRecordIndex() {
// if (test) {
return GenericAttribute.ES_CHANNEL_INDEX_TEST;
// }
// // 近1年数据库 TODO
// Calendar date = Calendar.getInstance();
// int year = date.get(Calendar.YEAR);
// res.add(GenericAttribute.ES_CHANNEL_INDEX_TEST);
// return res.toArray(new String[0]);
}
private BoolQueryBuilder channelUniqueBool(String fid) {
BoolQueryBuilder postFilter = QueryBuilders.boolQuery();
return postFilter.must(QueryBuilders.termQuery(ChannelRecord.LINK_MAPS + ".fid.keyword", fid));
}
}
...@@ -8,6 +8,7 @@ import com.zhiwei.brandkbs2.util.Tools; ...@@ -8,6 +8,7 @@ import com.zhiwei.brandkbs2.util.Tools;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.apache.commons.lang3.time.FastDateFormat; import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.ClearScrollRequest; import org.elasticsearch.action.search.ClearScrollRequest;
...@@ -58,16 +59,16 @@ public class EsClientDao { ...@@ -58,16 +59,16 @@ public class EsClientDao {
private static final TimeValue TIME_VALUE = TimeValue.timeValueMinutes(8); private static final TimeValue TIME_VALUE = TimeValue.timeValueMinutes(8);
@Value("${es.index.test}") @Value("${es.index.test}")
private boolean test; boolean test;
@Resource(name = "esClient") @Resource(name = "esClient")
private RestHighLevelClient esClient; RestHighLevelClient esClient;
@Resource(name = "esSearchExecutor") @Resource(name = "esSearchExecutor")
private ThreadPoolTaskExecutor executor; ThreadPoolTaskExecutor executor;
@Resource(name = "retryTemplate") @Resource(name = "retryTemplate")
private RetryTemplate retryTemplate; RetryTemplate retryTemplate;
public Map<String, JSONObject> searchByIds(Collection<String> queryIds) throws IOException { public Map<String, JSONObject> searchByIds(Collection<String> queryIds) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
...@@ -83,26 +84,18 @@ public class EsClientDao { ...@@ -83,26 +84,18 @@ public class EsClientDao {
* @param day 几天前天数 * @param day 几天前天数
* @return 渠道记录结果 * @return 渠道记录结果
*/ */
public Map<ChannelIndex, ChannelIndex.ChannelRecord> searchChannelRecordRecentDay(int day) { public List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> searchRecordRecentDay(int day) {
Map<ChannelIndex, ChannelIndex.ChannelRecord> res = new HashMap<>(); List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> res = new ArrayList<>();
Calendar calendar = Calendar.getInstance(); Calendar calendar = Calendar.getInstance();
long endTime = calendar.getTime().getTime(); long endTime = calendar.getTime().getTime();
calendar.add(Calendar.DAY_OF_MONTH, -day); calendar.add(Calendar.DAY_OF_MONTH, -day);
long startTime = calendar.getTime().getTime(); long startTime = calendar.getTime().getTime();
List<Long[]> cutTimes = Tools.cutTimeRange(startTime, endTime, ONE_HOUR); List<Long[]> cutTimes = Tools.cutTimeRange(startTime, endTime, ONE_HOUR);
List<CompletableFuture<Map<ChannelIndex, ChannelIndex.ChannelRecord>>> futures = new ArrayList<>(cutTimes.size()); List<CompletableFuture<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>>> futures = new ArrayList<>(cutTimes.size());
cutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> searchChannelRecord(times[0], times[1]), executor))); cutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> searchRecord(times[0], times[1]), executor)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> { CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> {
futures.forEach(f -> { futures.forEach(f -> {
Map<ChannelIndex, ChannelIndex.ChannelRecord> channelIndexMap = f.join(); res.add(f.join());
channelIndexMap.forEach((channelIndex, record) -> {
res.compute(channelIndex, (k, v) -> {
if (null == v) {
return record;
}
return v.mergeRecord(record);
});
});
}); });
}).join(); }).join();
return res; return res;
...@@ -174,8 +167,8 @@ public class EsClientDao { ...@@ -174,8 +167,8 @@ public class EsClientDao {
return retryTemplate.execute(context -> searchScroll(sourceBuilder)); return retryTemplate.execute(context -> searchScroll(sourceBuilder));
} }
private Map<ChannelIndex, ChannelIndex.ChannelRecord> searchChannelRecord(long startTime, long endTime) { private Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>> searchRecord(long startTime, long endTime) {
Map<ChannelIndex, ChannelIndex.ChannelRecord> res = new HashMap<>(); Map<ChannelIndex, ChannelIndex.Record> res = new HashMap<>();
try { try {
QueryBuilder queryBuilder = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(endTime); QueryBuilder queryBuilder = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(endTime);
List<Map<String, Object>> results = searchScroll(queryBuilder, 10000, CHANNEL_RECORD_FETCH_SOURCE); List<Map<String, Object>> results = searchScroll(queryBuilder, 10000, CHANNEL_RECORD_FETCH_SOURCE);
...@@ -183,22 +176,22 @@ public class EsClientDao { ...@@ -183,22 +176,22 @@ public class EsClientDao {
for (ChannelIndex channelIndex : ChannelIndex.createChannelIndexes(result)) { for (ChannelIndex channelIndex : ChannelIndex.createChannelIndexes(result)) {
res.compute(channelIndex, (k, v) -> { res.compute(channelIndex, (k, v) -> {
if (null == v) { if (null == v) {
v = new ChannelIndex.ChannelRecord(); v = new ChannelIndex.Record();
} }
try { try {
return v.mergeRecord(new ChannelIndex.ChannelRecord((long) result.get(GenericAttribute.ES_TIME), String.valueOf(result.get("id")))); return v.mergeRecord(new ChannelIndex.Record((long) result.get(GenericAttribute.ES_TIME), String.valueOf(result.get("id"))));
} catch (Exception e) { } catch (Exception e) {
log.error("searchChannelRecord-error-id:{}", result.get("id"), e); log.error("searchRecord-error-id:{}", result.get("id"), e);
return null; return null;
} }
}); });
} }
} }
} catch (IOException e) { } catch (IOException e) {
log.error("searchChannelRecord-", e); log.error("searchRecord-", e);
} }
log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size()); log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size());
return res; return Pair.of(new Long[]{startTime, endTime}, res);
} }
public String[] getIndexes() { public String[] getIndexes() {
......
...@@ -83,6 +83,32 @@ public class EsQueryTools { ...@@ -83,6 +83,32 @@ public class EsQueryTools {
} }
/** /**
* 媒体类型请求匹配
*
* @param mediaTypes
* @return
*/
public static BoolQueryBuilder assembleMediaTypeQuery(List<String> mediaTypes) {
BoolQueryBuilder tagQuery = QueryBuilders.boolQuery();
mediaTypes.forEach(e -> {
tagQuery.should(QueryBuilders.termQuery("mark_cache_maps.channel_type.keyword", e));
});
return tagQuery;
}
public static BoolQueryBuilder assembleSourceQuery(String sourceKeyword) {
BoolQueryBuilder channelBoolQueryBuilder = QueryBuilders.boolQuery();
String[] keys = sourceKeyword.trim().split("\\|");
for (String key : keys) {
String channelRegex = getAllRegex(key);
BoolQueryBuilder keyQueryBuilder = QueryBuilders.boolQuery();
keyQueryBuilder.must(QueryBuilders.regexpQuery("source.keyword",".*"+channelRegex+".*"));
channelBoolQueryBuilder.should(keyQueryBuilder);
}
return channelBoolQueryBuilder;
}
/**
* platform 组合语句 * platform 组合语句
* *
* @param @param platforms * @param @param platforms
...@@ -159,4 +185,25 @@ public class EsQueryTools { ...@@ -159,4 +185,25 @@ public class EsQueryTools {
return queryMap; return queryMap;
} }
private static String getAllRegex(String word) {
String LittleRegex = "[a-z]";
String BigRegex = "[A-Z]";
String[] bs = word.split("");
StringBuilder regex = new StringBuilder();
for (String b : bs) {
if (b.matches(LittleRegex))// 如果是 小写
{
regex.append("[").append(b).append(b.toUpperCase()).append("]");
} else if (b.matches(BigRegex))// 大写
{
regex.append("[").append(b).append(b.toLowerCase()).append("]");
} else {
regex.append(b);
}
//b = (b + "").matches(LittleRegex) ? b : b;
}
return regex.toString();
}
} }
...@@ -84,4 +84,14 @@ public class EsRestClient { ...@@ -84,4 +84,14 @@ public class EsRestClient {
return client; return client;
} }
@Bean(name = "localEsClient")
public RestHighLevelClient localEsClient() {
List<HttpHost> httpHostList = new ArrayList<>();
httpHostList.add(new HttpHost("192.168.0.250", 9200));
HttpHost[] httpHosts = httpHostList.toArray(new HttpHost[0]);
// 判断,如果未配置用户名,则进行无用户名密码连接,配置了用户名,则进行用户名密码连接
return new RestHighLevelClient(RestClient.builder(httpHosts));
}
} }
...@@ -22,7 +22,7 @@ public class EsRetryTools { ...@@ -22,7 +22,7 @@ public class EsRetryTools {
SimpleRetryPolicy policy = new SimpleRetryPolicy(); SimpleRetryPolicy policy = new SimpleRetryPolicy();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
// 策略 // 策略
policy.setMaxAttempts(2);// 重试次数 policy.setMaxAttempts(3);// 重试次数
template.setRetryPolicy(policy); template.setRetryPolicy(policy);
backOffPolicy.setInitialInterval(1000);// 初始休眠 backOffPolicy.setInitialInterval(1000);// 初始休眠
backOffPolicy.setMultiplier(2);// 指定乘数 下次休眠 = multiplier * 当前休眠时间 backOffPolicy.setMultiplier(2);// 指定乘数 下次休眠 = multiplier * 当前休眠时间
......
...@@ -6,7 +6,6 @@ import lombok.Getter; ...@@ -6,7 +6,6 @@ import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import java.util.Date; import java.util.Date;
import java.util.LinkedHashSet;
/** /**
* @ClassName: Channel * @ClassName: Channel
...@@ -29,19 +28,19 @@ public class Channel extends ChannelIndex { ...@@ -29,19 +28,19 @@ public class Channel extends ChannelIndex {
private Long lastTime; private Long lastTime;
/** /**
* 图片地址 * 图片头像地址
*/ */
private String imgUrl; private String avatarUrl;
/** /**
* 发布稿件id列表 * 发布稿件
*/ */
private LinkedHashSet<String> articleIds; private long articleCount;
/** /**
* 参与事件id列表 * 参与事件
*/ */
private LinkedHashSet<String> eventIds; private long eventCount;
/** /**
* 渠道倾向 * 渠道倾向
...@@ -59,7 +58,7 @@ public class Channel extends ChannelIndex { ...@@ -59,7 +58,7 @@ public class Channel extends ChannelIndex {
private String experienceLevel; private String experienceLevel;
/** /**
* 是否屏蔽 * 是否展示
*/ */
private boolean show = true; private boolean show = true;
...@@ -69,24 +68,24 @@ public class Channel extends ChannelIndex { ...@@ -69,24 +68,24 @@ public class Channel extends ChannelIndex {
} }
} }
public void setArticleIds(LinkedHashSet<String> articleIds) { public void increaseArticleCount() {
if (null == this.articleIds) { this.articleCount += 1;
this.articleIds = articleIds;
} else {
this.articleIds.addAll(articleIds);
} }
public void increaseArticleCount(long articleCount) {
this.articleCount += articleCount;
} }
public void setChannelRecord(ChannelRecord channelRecord) { public void setRecord(Record record) {
setLastTime(channelRecord.getLastTime()); setLastTime(record.getLastTime());
setArticleIds(channelRecord.getArticleIds()); increaseArticleCount(record.getArticleIds().size());
} }
public ChannelIndex getChannelIndex() { public ChannelIndex getChannelIndex() {
return Tools.convertMap(this, ChannelIndex.class); return Tools.convertMap(this, ChannelIndex.class);
} }
public static Channel createFromChannelIndexRecord(ChannelIndex channelIndex, ChannelRecord channelRecord) { public static Channel createFromChannelIndexRecord(ChannelIndex channelIndex, Record record) {
Channel channel = new Channel(); Channel channel = new Channel();
channel.setCTime(new Date().getTime()); channel.setCTime(new Date().getTime());
channel.setProjectId(channelIndex.getProjectId()); channel.setProjectId(channelIndex.getProjectId());
...@@ -97,8 +96,8 @@ public class Channel extends ChannelIndex { ...@@ -97,8 +96,8 @@ public class Channel extends ChannelIndex {
// fid 用来便捷搜索 // fid 用来便捷搜索
channel.setFid(channelIndex.getFid()); channel.setFid(channelIndex.getFid());
channel.setLastTime(channelRecord.getLastTime()); channel.setLastTime(record.getLastTime());
channel.setArticleIds(channelRecord.getArticleIds()); channel.setRecord(record);
// TODO 调性随机分配 // TODO 调性随机分配
double random = Math.random(); double random = Math.random();
if (random < 0.3) { if (random < 0.3) {
......
...@@ -88,30 +88,51 @@ public class ChannelIndex extends AbstractBaseMongo { ...@@ -88,30 +88,51 @@ public class ChannelIndex extends AbstractBaseMongo {
return res; return res;
} }
/**
* 合并渠道记录
*
* @param channelIndexMaps
* @return
*/
public static Map<ChannelIndex, Record> mergeRecord(List<Map<ChannelIndex, Record>> channelIndexMaps) {
Map<ChannelIndex, Record> mergeRecord = new HashMap<>();
for (Map<ChannelIndex, Record> channelIndexMap : channelIndexMaps) {
channelIndexMap.forEach((channelIndex, record) -> {
mergeRecord.compute(channelIndex, (k, v) -> {
if (null == v) {
return record;
}
return v.mergeRecord(record);
});
});
}
return mergeRecord;
}
@Setter @Setter
@Getter @Getter
public static class ChannelRecord { public static class Record {
private Long lastTime; private Long lastTime;
private LinkedHashSet<String> articleIds = new LinkedHashSet<>(); private List<String> articleIds = new ArrayList<>();
public ChannelRecord() { public Record() {
} }
public ChannelRecord(Long lastTime, String articleId) { public Record(Long lastTime, String articleId) {
this.lastTime = lastTime; this.lastTime = lastTime;
this.articleIds.add(articleId); this.articleIds.add(articleId);
} }
private void setLastTime(Long lastTime) { public void setLastTime(Long lastTime) {
if (null == this.lastTime || this.lastTime < lastTime) { if (null == this.lastTime || this.lastTime < lastTime) {
this.lastTime = lastTime; this.lastTime = lastTime;
} }
} }
public ChannelRecord mergeRecord(ChannelRecord channelRecord) { public Record mergeRecord(Record record) {
this.articleIds.addAll(channelRecord.getArticleIds()); this.articleIds.addAll(record.getArticleIds());
setLastTime(channelRecord.getLastTime()); setLastTime(record.getLastTime());
return this; return this;
} }
...@@ -123,12 +144,13 @@ public class ChannelIndex extends AbstractBaseMongo { ...@@ -123,12 +144,13 @@ public class ChannelIndex extends AbstractBaseMongo {
if (o == null || getClass() != o.getClass()) return false; if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false; if (!super.equals(o)) return false;
ChannelIndex that = (ChannelIndex) o; ChannelIndex that = (ChannelIndex) o;
return Objects.equals(linkedGroupId, that.linkedGroupId) && Objects.equals(platform, that.platform) && Objects.equals(realSource, that.realSource) && Objects.equals(source, that.source); return Objects.equals(projectId, that.projectId) && Objects.equals(linkedGroupId, that.linkedGroupId) && Objects.equals(platform, that.platform) && Objects.equals(realSource,
that.realSource) && Objects.equals(source, that.source);
} }
@Override @Override
public int hashCode() { public int hashCode() {
return Objects.hash(linkedGroupId, platform, realSource, source); return Objects.hash(projectId, linkedGroupId, platform, realSource, source);
} }
} }
package com.zhiwei.brandkbs2.pojo;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @ClassName: ChannelRecord
* @Description 渠道记录实体
* @author: sjj
* @date: 2022-07-26 11:54
*/
@Setter
@Getter
public class ChannelRecord {
public static final String RANGE_START_TIME = "range_start_time";
public static final String RANGE_END_TIME = "range_end_time";
public static final String LINK_MAPS = "link_maps";
/**
* 区间内统计开始时间
*/
private long rangeStartTime;
/**
* 区间内统计结束时间
*/
private long rangeEndTime;
/**
* 关联表
*/
private List<LinkMap> linkMaps;
public ChannelRecord(long rangeStartTime, long rangeEndTime, @NonNull Map<ChannelIndex, ChannelIndex.Record> records) {
this.rangeStartTime = rangeStartTime;
this.rangeEndTime = rangeEndTime;
this.linkMaps = records.entrySet().stream().map(entry -> new LinkMap(entry.getKey(), entry.getValue())).collect(Collectors.toList());
}
public ChannelRecord(Map<String, Object> esMap) {
JSONObject json = new JSONObject(esMap);
this.rangeStartTime = json.getLongValue(RANGE_START_TIME);
this.rangeEndTime = json.getLongValue(RANGE_END_TIME);
List<LinkMap> list = new ArrayList<>();
List<Map<String, Object>> maps = (List<Map<String, Object>>) json.get(LINK_MAPS);
for (Map<String, Object> map : maps) {
// channelIndex
ChannelIndex channelIndex = new ChannelIndex();
channelIndex.setProjectId(map.get("project_id") + "");
channelIndex.setLinkedGroupId(map.get("linked_group_id") + "");
channelIndex.setPlatform(map.get("platform") + "");
channelIndex.setRealSource(map.get("real_source") + "");
channelIndex.setSource(map.get("source") + "");
Long lastTime = Long.valueOf(map.get("last_time") + "");
List<String> articleIds = (List<String>) map.get("article_ids");
// record
ChannelIndex.Record record = new ChannelIndex.Record();
record.setLastTime(lastTime);
record.setArticleIds(articleIds);
LinkMap linkMap = new LinkMap(channelIndex, record);
list.add(linkMap);
}
this.linkMaps = list;
}
public Map<String, Object> toEsMap() {
Map<String, Object> esMap = new HashMap<>();
esMap.put(RANGE_START_TIME, rangeStartTime);
esMap.put(RANGE_END_TIME, rangeEndTime);
esMap.put(LINK_MAPS, linkMaps.stream().map(LinkMap::toEsLinkMap).collect(Collectors.toList()));
return esMap;
}
@AllArgsConstructor
@Getter
public static class LinkMap {
private final ChannelIndex index;
private final ChannelIndex.Record record;
private Map<String, Object> toEsLinkMap() {
if (null == index || null == record) {
return null;
}
Map<String, Object> map = new HashMap<>();
map.put("project_id", index.getProjectId());
map.put("linked_group_id", index.getLinkedGroupId());
map.put("platform", index.getPlatform());
map.put("real_source", index.getRealSource());
map.put("source", index.getSource());
map.put("fid", index.getFid());
map.put("last_time", record.getLastTime());
map.put("article_ids", record.getArticleIds());
return map;
}
}
}
...@@ -23,6 +23,8 @@ public interface CommonService { ...@@ -23,6 +23,8 @@ public interface CommonService {
*/ */
List<MarkerTag> getQbjcTags(String linkedGroupId, TagSearch... tagSearches); List<MarkerTag> getQbjcTags(String linkedGroupId, TagSearch... tagSearches);
List<MarkerTag> getEmotionTagsWithSort(String projectId, String linkedGroupId, TagSearch... tagSearches);
/** /**
* 获取qbjcTags * 获取qbjcTags
* *
......
...@@ -10,6 +10,7 @@ import com.zhiwei.brandkbs2.easyexcel.dto.ExportAdminChannelEventDTO; ...@@ -10,6 +10,7 @@ import com.zhiwei.brandkbs2.easyexcel.dto.ExportAdminChannelEventDTO;
import com.zhiwei.brandkbs2.easyexcel.dto.ExportChannelDTO; import com.zhiwei.brandkbs2.easyexcel.dto.ExportChannelDTO;
import com.zhiwei.brandkbs2.enmus.EventTagEnum; import com.zhiwei.brandkbs2.enmus.EventTagEnum;
import com.zhiwei.brandkbs2.enmus.ExperienceEnum; import com.zhiwei.brandkbs2.enmus.ExperienceEnum;
import com.zhiwei.brandkbs2.es.ChannelEsDao;
import com.zhiwei.brandkbs2.es.EsClientDao; import com.zhiwei.brandkbs2.es.EsClientDao;
import com.zhiwei.brandkbs2.exception.ExceptionCast; import com.zhiwei.brandkbs2.exception.ExceptionCast;
import com.zhiwei.brandkbs2.model.CommonCodeEnum; import com.zhiwei.brandkbs2.model.CommonCodeEnum;
...@@ -62,6 +63,9 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -62,6 +63,9 @@ public class ChannelServiceImpl implements ChannelService {
@Resource(name = "esClientDao") @Resource(name = "esClientDao")
EsClientDao esClientDao; EsClientDao esClientDao;
@Resource(name = "channelEsDao")
ChannelEsDao channelEsDao;
@Resource(name = "channelTagDao") @Resource(name = "channelTagDao")
ChannelTagDao channelTagDao; ChannelTagDao channelTagDao;
...@@ -87,14 +91,14 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -87,14 +91,14 @@ public class ChannelServiceImpl implements ChannelService {
result.put("platform", channel.getPlatform()); result.put("platform", channel.getPlatform());
result.put("realSource", channel.getRealSource()); result.put("realSource", channel.getRealSource());
result.put("source", channel.getSource()); result.put("source", channel.getSource());
result.put("articleCount", channel.getArticleIds().size()); result.put("articleCount", channel.getArticleCount());
result.put("eventCount", null == channel.getEventIds() ? 0 : channel.getEventIds().size()); result.put("eventCount", channel.getEventCount());
result.put("emotion", channel.getEmotion()); result.put("emotion", channel.getEmotion());
result.put("emotionIndex", channel.getEmotionIndex()); result.put("emotionIndex", channel.getEmotionIndex());
result.put("experienceLevel", ExperienceEnum.getValueFromDataBaseName(channel.getExperienceLevel())); result.put("experienceLevel", ExperienceEnum.getValueFromDataBaseName(channel.getExperienceLevel()));
result.put("lastTime", channel.getLastTime()); result.put("lastTime", channel.getLastTime());
result.put("show", channel.isShow()); result.put("show", channel.isShow());
result.put("imgUrl", channel.getImgUrl()); result.put("imgUrl", channel.getAvatarUrl());
result.put("tag", channelTagDao.getTagByChannelName(channel.getSource())); result.put("tag", channelTagDao.getTagByChannelName(channel.getSource()));
return result; return result;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
...@@ -106,7 +110,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -106,7 +110,7 @@ public class ChannelServiceImpl implements ChannelService {
public PageVO<JSONObject> findArticleList(int page, int size, String channelId) { public PageVO<JSONObject> findArticleList(int page, int size, String channelId) {
try { try {
Channel channel = channelDao.findOneById(channelId); Channel channel = channelDao.findOneById(channelId);
LinkedHashSet<String> articleIds = channel.getArticleIds(); List<String> articleIds = channelEsDao.getArticleIds(channel.getFid());
List<String> queryIds = Tools.pageList(page, size, articleIds); List<String> queryIds = Tools.pageList(page, size, articleIds);
List<JSONObject> resList = new ArrayList<>(); List<JSONObject> resList = new ArrayList<>();
Map<String, JSONObject> idMaps = esClientDao.searchByIds(queryIds); Map<String, JSONObject> idMaps = esClientDao.searchByIds(queryIds);
...@@ -132,7 +136,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -132,7 +136,7 @@ public class ChannelServiceImpl implements ChannelService {
@Override @Override
public PageVO<JSONObject> findEventList(int page, int size, String channelId) { public PageVO<JSONObject> findEventList(int page, int size, String channelId) {
Channel channel = channelDao.findOneById(channelId); Channel channel = channelDao.findOneById(channelId);
LinkedHashSet<String> eventIds = channel.getEventIds(); List<String> eventIds = eventDao.getEventCount(channel.getChannelIndex());
Query query = Query.query(Criteria.where("_id").in(eventIds)); Query query = Query.query(Criteria.where("_id").in(eventIds));
long total = eventDao.count(query); long total = eventDao.count(query);
mongoUtil.start(page, size, query); mongoUtil.start(page, size, query);
...@@ -184,7 +188,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -184,7 +188,7 @@ public class ChannelServiceImpl implements ChannelService {
jsonObject.put("platform", channel.getPlatform()); jsonObject.put("platform", channel.getPlatform());
jsonObject.put("realSource", channel.getRealSource()); jsonObject.put("realSource", channel.getRealSource());
jsonObject.put("source", channel.getSource()); jsonObject.put("source", channel.getSource());
jsonObject.put("imgUrl", channel.getImgUrl()); jsonObject.put("imgUrl", channel.getAvatarUrl());
jsonObject.put("tag", channelTagDao.getTagByChannelName(channel.getSource())); jsonObject.put("tag", channelTagDao.getTagByChannelName(channel.getSource()));
return jsonObject; return jsonObject;
} }
...@@ -210,7 +214,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -210,7 +214,7 @@ public class ChannelServiceImpl implements ChannelService {
List<ExportAdminChannelArticleDTO> resList = new ArrayList<>(); List<ExportAdminChannelArticleDTO> resList = new ArrayList<>();
try { try {
Channel channel = channelDao.findOneById(channelId); Channel channel = channelDao.findOneById(channelId);
LinkedHashSet<String> articleIds = channel.getArticleIds(); List<String> articleIds = channelEsDao.getArticleIds(channel.getFid());
Map<String, JSONObject> resultMap = esClientDao.searchByIds(articleIds); Map<String, JSONObject> resultMap = esClientDao.searchByIds(articleIds);
resultMap.values().forEach(json -> { resultMap.values().forEach(json -> {
// url,title,platform,forward,emotion 未配置 // url,title,platform,forward,emotion 未配置
...@@ -229,7 +233,7 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -229,7 +233,7 @@ public class ChannelServiceImpl implements ChannelService {
public List<ExportAdminChannelEventDTO> findDownloadChannelEventList(String channelId) { public List<ExportAdminChannelEventDTO> findDownloadChannelEventList(String channelId) {
List<ExportAdminChannelEventDTO> resList = new ArrayList<>(); List<ExportAdminChannelEventDTO> resList = new ArrayList<>();
Channel channel = channelDao.findOneById(channelId); Channel channel = channelDao.findOneById(channelId);
LinkedHashSet<String> eventIds = channel.getEventIds(); List<String> eventIds = eventDao.getEventCount(channel.getChannelIndex());
if (CollectionUtils.isEmpty(eventIds)) { if (CollectionUtils.isEmpty(eventIds)) {
return resList; return resList;
} }
...@@ -290,8 +294,6 @@ public class ChannelServiceImpl implements ChannelService { ...@@ -290,8 +294,6 @@ public class ChannelServiceImpl implements ChannelService {
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("count").field("mark_cache_maps.name.keyword"); TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("count").field("mark_cache_maps.name.keyword");
return null; return null;
} }
......
package com.zhiwei.brandkbs2.service.impl; package com.zhiwei.brandkbs2.service.impl;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.config.Constant; import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.enmus.EmotionEnum;
import com.zhiwei.brandkbs2.service.CommonService; import com.zhiwei.brandkbs2.service.CommonService;
import com.zhiwei.brandkbs2.service.ProjectService; import com.zhiwei.brandkbs2.service.ProjectService;
import com.zhiwei.brandkbs2.util.Tools; import com.zhiwei.brandkbs2.util.Tools;
...@@ -57,6 +57,12 @@ public class CommonServiceImpl implements CommonService { ...@@ -57,6 +57,12 @@ public class CommonServiceImpl implements CommonService {
} }
@Override @Override
public List<MarkerTag> getEmotionTagsWithSort(String projectId, String linkedGroupId, TagSearch... tagSearches) {
List<MarkerTag> emotionList = getQbjcTags(linkedGroupId, TagField.GROUP_NAME.is("情感倾向"));
return emotionList.stream().filter(tag -> 0 != EmotionEnum.parseFromName(tag.getName())).sorted(Comparator.comparingInt(tag -> EmotionEnum.parseFromName(tag.getName()))).collect(Collectors.toList());
}
@Override
public List<MarkerTag> getQbjcTagsByGroupName(String groupName, TagSearch... tagSearches) { public List<MarkerTag> getQbjcTagsByGroupName(String groupName, TagSearch... tagSearches) {
Objects.requireNonNull(groupName); Objects.requireNonNull(groupName);
TagSearch defaultSearch = TagField.PROJECT.is(groupName); TagSearch defaultSearch = TagField.PROJECT.is(groupName);
......
...@@ -16,7 +16,9 @@ import com.zhiwei.brandkbs2.es.EsQueryTools; ...@@ -16,7 +16,9 @@ import com.zhiwei.brandkbs2.es.EsQueryTools;
import com.zhiwei.brandkbs2.exception.ExceptionCast; import com.zhiwei.brandkbs2.exception.ExceptionCast;
import com.zhiwei.brandkbs2.listener.ApplicationProjectListener; import com.zhiwei.brandkbs2.listener.ApplicationProjectListener;
import com.zhiwei.brandkbs2.model.CommonCodeEnum; import com.zhiwei.brandkbs2.model.CommonCodeEnum;
import com.zhiwei.brandkbs2.pojo.*; import com.zhiwei.brandkbs2.pojo.AggreeResult;
import com.zhiwei.brandkbs2.pojo.BaseMap;
import com.zhiwei.brandkbs2.pojo.MarkFlowEntity;
import com.zhiwei.brandkbs2.pojo.dto.ExportAppYuqingDTO; import com.zhiwei.brandkbs2.pojo.dto.ExportAppYuqingDTO;
import com.zhiwei.brandkbs2.pojo.dto.MarkSearchDTO; import com.zhiwei.brandkbs2.pojo.dto.MarkSearchDTO;
import com.zhiwei.brandkbs2.pojo.vo.CustomTagVo; import com.zhiwei.brandkbs2.pojo.vo.CustomTagVo;
...@@ -27,7 +29,6 @@ import com.zhiwei.brandkbs2.util.MongoUtil; ...@@ -27,7 +29,6 @@ import com.zhiwei.brandkbs2.util.MongoUtil;
import com.zhiwei.brandkbs2.util.RedisUtil; import com.zhiwei.brandkbs2.util.RedisUtil;
import com.zhiwei.brandkbs2.util.TextUtil; import com.zhiwei.brandkbs2.util.TextUtil;
import com.zhiwei.brandkbs2.util.Tools; import com.zhiwei.brandkbs2.util.Tools;
import com.zhiwei.middleware.mark.pojo.enums.TagField;
import com.zhiwei.qbjc.bean.pojo.common.MessagePlatform; import com.zhiwei.qbjc.bean.pojo.common.MessagePlatform;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -109,7 +110,8 @@ public class MarkDataServiceImpl implements MarkDataService { ...@@ -109,7 +110,8 @@ public class MarkDataServiceImpl implements MarkDataService {
// 搜索es数据 // 搜索es数据
Pair<SearchHits[], Map<String, Long>> hitsAndCounts = searchMarkHitsAndCount(markSearchDTO, false); Pair<SearchHits[], Map<String, Long>> hitsAndCounts = searchMarkHitsAndCount(markSearchDTO, false);
// 总量 // 总量
long total = hitsAndCounts.getLeft()[0].getTotalHits().value > 10000 ? 10000 : hitsAndCounts.getLeft()[0].getTotalHits().value; // long total = hitsAndCounts.getLeft()[0].getTotalHits().value > 10000 ? 10000 : hitsAndCounts.getLeft()[0].getTotalHits().value;
long total = hitsAndCounts.getLeft()[0].getTotalHits().value;
// 消息列表 // 消息列表
List<MarkFlowEntity> flowEntityList = getMarkFlowEntity(markSearchDTO, hitsAndCounts.getLeft()[0]); List<MarkFlowEntity> flowEntityList = getMarkFlowEntity(markSearchDTO, hitsAndCounts.getLeft()[0]);
// 返回分页结果并设置平台count // 返回分页结果并设置平台count
...@@ -278,8 +280,9 @@ public class MarkDataServiceImpl implements MarkDataService { ...@@ -278,8 +280,9 @@ public class MarkDataServiceImpl implements MarkDataService {
@Override @Override
public JSONObject getYuqingMarkCriteria(String linkedGroupId) { public JSONObject getYuqingMarkCriteria(String linkedGroupId) {
String projectId = UserThreadLocal.getProjectId();
if (null == linkedGroupId) { if (null == linkedGroupId) {
linkedGroupId = projectService.getProjectVOById(UserThreadLocal.getProjectId()).getBrandLinkedGroupId(); linkedGroupId = projectService.getProjectVOById(projectId).getBrandLinkedGroupId();
} }
JSONObject result = new JSONObject(); JSONObject result = new JSONObject();
// 搜索时间 // 搜索时间
...@@ -287,7 +290,7 @@ public class MarkDataServiceImpl implements MarkDataService { ...@@ -287,7 +290,7 @@ public class MarkDataServiceImpl implements MarkDataService {
// 平台 // 平台
result.put("platformList", commonService.getQbjcPlatform("id", "name")); result.put("platformList", commonService.getQbjcPlatform("id", "name"));
// 情感标签 // 情感标签
result.put("emotionList", commonService.getQbjcTags(linkedGroupId, TagField.GROUP_NAME.is("情感倾向")).stream().map(markerTag -> { result.put("emotionList", commonService.getEmotionTagsWithSort(projectId, linkedGroupId).stream().map(markerTag -> {
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
json.put("uniqueId", markerTag.getUniqueId()); json.put("uniqueId", markerTag.getUniqueId());
json.put("name", markerTag.getName()); json.put("name", markerTag.getName());
...@@ -545,12 +548,15 @@ public class MarkDataServiceImpl implements MarkDataService { ...@@ -545,12 +548,15 @@ public class MarkDataServiceImpl implements MarkDataService {
// PostFilter 后置过滤器 // PostFilter 后置过滤器
BoolQueryBuilder postFilter = projectLinkedGroupQuery(projectId, linkedGroupId); BoolQueryBuilder postFilter = projectLinkedGroupQuery(projectId, linkedGroupId);
// time // time
postFilter.must(QueryBuilders.rangeQuery(dto.getTimeType()).lte(dto.getStartTime()).lt(dto.getEndTime())); postFilter.must(QueryBuilders.rangeQuery(dto.getTimeType()).gte(dto.getStartTime()).lt(dto.getEndTime()));
// platform // platform
if (CollectionUtils.isNotEmpty(dto.getPlatforms())) { if (CollectionUtils.isNotEmpty(dto.getPlatforms())) {
postFilter.must(EsQueryTools.assemblePlatformQuery(Tools.getPlatformByIds(dto.getPlatforms()))); postFilter.must(EsQueryTools.assemblePlatformQuery(Tools.getPlatformByIds(dto.getPlatforms())));
} }
// mediaTypes ??? // mediaTypes
if (CollectionUtils.isNotEmpty(dto.getMediaTypes())) {
postFilter.must(EsQueryTools.assembleMediaTypeQuery(dto.getMediaTypes()));
}
// tags // tags
if (CollectionUtils.isNotEmpty(dto.getTags())) { if (CollectionUtils.isNotEmpty(dto.getTags())) {
postFilter.must(EsQueryTools.assembleTagQuery(dto.getTags())); postFilter.must(EsQueryTools.assembleTagQuery(dto.getTags()));
...@@ -559,32 +565,34 @@ public class MarkDataServiceImpl implements MarkDataService { ...@@ -559,32 +565,34 @@ public class MarkDataServiceImpl implements MarkDataService {
if (CollectionUtils.isNotEmpty(dto.getCustomTags())) { if (CollectionUtils.isNotEmpty(dto.getCustomTags())) {
postFilter.must(EsQueryTools.assembleTagQuery(dto.getCustomTags())); postFilter.must(EsQueryTools.assembleTagQuery(dto.getCustomTags()));
} }
helper.setPostFilter(postFilter);
// Query 查询条件 // Query 查询条件
BoolQueryBuilder query = QueryBuilders.boolQuery(); // BoolQueryBuilder query = QueryBuilders.boolQuery();
// keyword // keyword
if (StringUtils.isNotEmpty(dto.getKeyword())) { if (StringUtils.isNotEmpty(dto.getKeyword())) {
// Query // Query
String[] fieldSearch = "标题".equals(dto.getSearchField()) ? new String[]{GenericAttribute.ES_IND_TITLE} : new String[]{GenericAttribute.ES_IND_FULL_TEXT}; String[] fieldSearch = "标题".equals(dto.getSearchField()) ? new String[]{GenericAttribute.ES_IND_TITLE} : new String[]{GenericAttribute.ES_IND_FULL_TEXT};
query.must(EsQueryTools.assembleNormalKeywordQuery(dto.getKeyword(), fieldSearch)); postFilter.must(EsQueryTools.assembleNormalKeywordQuery(dto.getKeyword(), fieldSearch));
} }
// sourceKeyword // sourceKeyword
if (StringUtils.isNotEmpty(dto.getSourceKeyword())) { if (StringUtils.isNotEmpty(dto.getSourceKeyword())) {
query.must(QueryBuilders.matchQuery(GenericAttribute.ES_SOURCE, dto.getSourceKeyword())); postFilter.must(EsQueryTools.assembleSourceQuery(dto.getSourceKeyword()));
} }
helper.setQuery(query); helper.setPostFilter(postFilter);
// helper.setQuery(query);
// sort // sort
FieldSortBuilder sort = null; FieldSortBuilder sort = null;
if (null != dto.getSorter()) { if (null != dto.getSorter()) {
for (Map.Entry<String, Object> entry : dto.getSorter().entrySet()) { for (Map.Entry<String, Object> entry : dto.getSorter().entrySet()) {
// TODO 暂不支持 String key = entry.getKey();
if (entry.getKey().contains("influence")) { if (key.equals("influence")) {
continue; key = "channel_influence";
} else if (key.equals("followers")) {
key = "channel_followers";
} }
if (entry.getValue().toString().contains("desc")) { if (entry.getValue().toString().contains("desc")) {
sort = SortBuilders.fieldSort(entry.getKey()).order(SortOrder.DESC); sort = SortBuilders.fieldSort(key).order(SortOrder.DESC);
} else { } else {
sort = SortBuilders.fieldSort(entry.getKey()).order(SortOrder.ASC); sort = SortBuilders.fieldSort(key).order(SortOrder.ASC);
} }
} }
} }
...@@ -611,8 +619,9 @@ public class MarkDataServiceImpl implements MarkDataService { ...@@ -611,8 +619,9 @@ public class MarkDataServiceImpl implements MarkDataService {
Map<String, Long> counts = new HashMap<>(); Map<String, Long> counts = new HashMap<>();
if (1 == dto.getPage() && CollectionUtils.isEmpty(dto.getPlatforms())) { if (1 == dto.getPage() && CollectionUtils.isEmpty(dto.getPlatforms())) {
for (MessagePlatform platform : GlobalPojo.PLATFORMS) { for (MessagePlatform platform : GlobalPojo.PLATFORMS) {
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(helper.getQuery()).must(EsQueryTools.assemblePlatformQuery(Collections.singletonList(platform))); BoolQueryBuilder queryBuilder =
Long count = esClientDao.count(esClientDao.getIndexes(), helper.getPostFilter(), queryBuilder); QueryBuilders.boolQuery().must(helper.getPostFilter()).must(EsQueryTools.assemblePlatformQuery(Collections.singletonList(platform)));
Long count = esClientDao.count(esClientDao.getIndexes(), queryBuilder, null);
counts.put(platform.getName(), count); counts.put(platform.getName(), count);
} }
} }
......
...@@ -20,6 +20,8 @@ import com.zhiwei.brandkbs2.util.Tools; ...@@ -20,6 +20,8 @@ import com.zhiwei.brandkbs2.util.Tools;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -116,8 +118,10 @@ public class MarkFlowServiceImpl implements MarkFlowService { ...@@ -116,8 +118,10 @@ public class MarkFlowServiceImpl implements MarkFlowService {
// source // source
sourceDetails.put("source", source); sourceDetails.put("source", source);
// 粉丝量提取 // 粉丝量提取
Long followersNum = tJson.getLongValue(GenericAttribute.ES_FOLLOWERS_NUM); long followersNum = tJson.getLongValue(GenericAttribute.ES_FOLLOWERS_NUM);
if (followersNum > 0) {
sourceDetails.put("followersNum", followersNum); sourceDetails.put("followersNum", followersNum);
}
// 渠道标签 // 渠道标签
sourceDetails.put("channelTag", channelTagDao.getTagByChannelName(source)); sourceDetails.put("channelTag", channelTagDao.getTagByChannelName(source));
// 渠道倾向及id // 渠道倾向及id
...@@ -133,7 +137,10 @@ public class MarkFlowServiceImpl implements MarkFlowService { ...@@ -133,7 +137,10 @@ public class MarkFlowServiceImpl implements MarkFlowService {
sourceDetails.put("channelEmotion", ChannelEmotion.getNameFromState(hitMap.get("channel_emotion"))); sourceDetails.put("channelEmotion", ChannelEmotion.getNameFromState(hitMap.get("channel_emotion")));
} }
} }
sourceDetails.put("channelInfluence", tJson.getDoubleValue(GenericAttribute.ES_CHANNEL_INDEX)); double channelValue = tJson.getDoubleValue(GenericAttribute.ES_CHANNEL_INDEX);
if (channelValue > 0) {
sourceDetails.put("channelInfluence", new BigDecimal(channelValue).setScale(2, RoundingMode.UP));
}
return sourceDetails; return sourceDetails;
} }
......
...@@ -3,6 +3,7 @@ package com.zhiwei.brandkbs2.service.impl; ...@@ -3,6 +3,7 @@ package com.zhiwei.brandkbs2.service.impl;
import com.zhiwei.brandkbs2.config.Constant; import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.*; import com.zhiwei.brandkbs2.dao.*;
import com.zhiwei.brandkbs2.enmus.ReportTypeEnum; import com.zhiwei.brandkbs2.enmus.ReportTypeEnum;
import com.zhiwei.brandkbs2.es.ChannelEsDao;
import com.zhiwei.brandkbs2.es.EsClientDao; import com.zhiwei.brandkbs2.es.EsClientDao;
import com.zhiwei.brandkbs2.listener.ApplicationProjectListener; import com.zhiwei.brandkbs2.listener.ApplicationProjectListener;
import com.zhiwei.brandkbs2.pojo.*; import com.zhiwei.brandkbs2.pojo.*;
...@@ -17,12 +18,12 @@ import org.springframework.stereotype.Service; ...@@ -17,12 +18,12 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/** /**
* @ClassName: TaskServiceImpl * @ClassName: TaskServiceImpl
...@@ -37,6 +38,9 @@ public class TaskServiceImpl implements TaskService { ...@@ -37,6 +38,9 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "esClientDao") @Resource(name = "esClientDao")
EsClientDao esClientDao; EsClientDao esClientDao;
@Resource(name = "channelEsDao")
ChannelEsDao channelEsDao;
@Resource(name = "channelDao") @Resource(name = "channelDao")
ChannelDao channelDao; ChannelDao channelDao;
...@@ -63,27 +67,32 @@ public class TaskServiceImpl implements TaskService { ...@@ -63,27 +67,32 @@ public class TaskServiceImpl implements TaskService {
@Override @Override
public void messageFlowCount(int day) { public void messageFlowCount(int day) {
Map<ChannelIndex, ChannelIndex.ChannelRecord> channelIndexChannelRecordMap = esClientDao.searchChannelRecordRecentDay(day); List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day);
log.info("渠道信息记录-搜索到近{}天的记录条数{}条", day, channelIndexChannelRecordMap.size()); int total = rangeTimeRecords.stream().mapToInt(pair -> pair.getRight().values().size()).sum();
log.info("渠道统计-搜索到近{}天的受影响渠道数{}条", day, total);
List<ChannelRecord> channelRecords = rangeTimeRecords.stream().map(pair -> new ChannelRecord(pair.getLeft()[0], pair.getLeft()[1], pair.getRight())).collect(Collectors.toList());
channelEsDao.upsertChannelRecord(channelRecords);
log.info("渠道统计-小时级渠道记录统计结束");
List<Channel> insertList = new ArrayList<>(); List<Channel> insertList = new ArrayList<>();
for (Map.Entry<ChannelIndex, ChannelIndex.ChannelRecord> entry : channelIndexChannelRecordMap.entrySet()) { List<Map<ChannelIndex, ChannelIndex.Record>> channelList = rangeTimeRecords.stream().map(Pair::getRight).collect(Collectors.toList());
// 合并渠道记录
Map<ChannelIndex, ChannelIndex.Record> channelIndexRecordMap = ChannelIndex.mergeRecord(channelList);
for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : channelIndexRecordMap.entrySet()) {
// 是否已存在 // 是否已存在
Channel channel = channelDao.queryUnique(entry.getKey()); Channel channel = channelDao.queryUnique(entry.getKey());
if (null == channel) { if (null == channel) {
channel = Channel.createFromChannelIndexRecord(entry.getKey(), entry.getValue()); channel = Channel.createFromChannelIndexRecord(entry.getKey(), entry.getValue());
channel.setEventIds(new LinkedHashSet<>(eventDataDao.getEventCount(entry.getKey())));
insertList.add(channel); insertList.add(channel);
channelDao.insertOne(channel); channelDao.insertOne(channel);
} else { } else {
channel.setEventIds(new LinkedHashSet<>(eventDataDao.getEventCount(entry.getKey()))); channel.setRecord(entry.getValue());
channel.setChannelRecord(entry.getValue());
channelDao.updateOne(channel); channelDao.updateOne(channel);
} }
} }
// ListUtils.partition(insertList, 1000).forEach(list -> { // ListUtils.partition(insertList, 1000).forEach(list -> {
// channelDao.insertMany(list); // channelDao.insertMany(list);
// }); // });
log.info("渠道信息记录-录入完毕,新增渠道{}条,更新渠道{}条", insertList.size(), channelIndexChannelRecordMap.size() - insertList.size()); log.info("渠道统计-录入完毕,新增渠道{}条,更新渠道{}条", insertList.size(), total - insertList.size());
} }
@Override @Override
......
package com.zhiwei.brandkbs2.util; package com.zhiwei.brandkbs2.util;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
...@@ -10,10 +14,61 @@ import java.security.NoSuchAlgorithmException; ...@@ -10,10 +14,61 @@ import java.security.NoSuchAlgorithmException;
* @date 2019-8-6 15:53 * @date 2019-8-6 15:53
**/ **/
public final class Md5Util { public final class Md5Util {
private static final Logger logger = LogManager.getLogger(Md5Util.class);
private Md5Util() { private Md5Util() {
} }
/** /**
* 输出MD5
*
* @param objs 需要加密的字符串
* @return esId
*/
public static String encode2EsId(Object... objs) {
StringBuilder sb = new StringBuilder();
for (Object obj : objs) {
sb.append(obj.toString());
}
return encode2EsId(sb.toString());
}
/**
* 输出MD5
*
* @param sourceStr 需要加密的字符串
* @return esId
*/
public static String encode2EsId(String sourceStr) {
String md5 = null;
try {
// 创建加密对象
MessageDigest md = MessageDigest.getInstance("MD5");
md5 = mdUpdate(sourceStr, md);
} catch (Exception e) {
logger.info("MD5加密出错", e);
}
return md5;
}
private static String mdUpdate(String sourceStr, MessageDigest md) {
// 计算MD5函数
md.update(sourceStr.getBytes(StandardCharsets.UTF_8));
byte[] b = md.digest();
int i;
StringBuilder buf = new StringBuilder();
for (byte value : b) {
i = value;
if (i < 0)
i += 256;
if (i < 16)
buf.append("0");
buf.append(Integer.toHexString(i));
}
return buf.toString();
}
/**
* 将明文密码转成MD5密码 * 将明文密码转成MD5密码
* *
* @param password 密码 * @param password 密码
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment