Commit af1345e0 by shenjunjie

新增渠道记录esDao

parent 544fd707
......@@ -16,6 +16,7 @@ public class GenericAttribute {
*/
public static final String ES_INDEX_PRE = "brandkbs2";
public static final String ES_INDEX_TEST = "brandkbs2_test";
public static final String ES_CHANNEL_INDEX_TEST = "brandkbs2_channel_record_test";
/**
* es ind_title
**/
......
......@@ -72,7 +72,7 @@ public class AppArticleController extends BaseController {
@ApiOperation("舆情列表-搜索条件")
@GetMapping("/mark/list/criteria")
public ResponseResult getYuqingMark(@RequestParam(required = false) String linkedGroupId) {
public ResponseResult getYuqingMarkCriteria(@RequestParam(required = false) String linkedGroupId) {
return ResponseResult.success(markDataService.getYuqingMarkCriteria(linkedGroupId));
}
......
package com.zhiwei.brandkbs2.dao;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.pojo.Event;
import java.util.List;
/**
* @ClassName: EventDao
* @Description EventDao
......@@ -30,4 +33,13 @@ public interface EventDao extends BaseMongoDao<Event>{
*/
Event getEventByUniqueIds(String yqEventId,String projectId,String linkedGroupId);
/**
* 获取参与事件数
*
* @param channelIndex 渠道标识
* @return 参与事件数
*/
List<String> getEventCount(ChannelIndex channelIndex);
}
......@@ -23,14 +23,6 @@ public interface EventDataDao extends BaseMongoDao<EventData>, ShardingMongo {
EventData findFirstData(String eventId, String collectionName);
/**
* 获取参与事件数
*
* @param channelIndex 渠道标识
* @return 参与事件数
*/
List<String> getEventCount(ChannelIndex channelIndex);
/**
* 获取传播量
*
* @param event 事件
......
package com.zhiwei.brandkbs2.dao.impl;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.dao.EventDao;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.pojo.Event;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import java.util.Calendar;
import java.util.List;
import java.util.stream.Collectors;
import static com.zhiwei.brandkbs2.dao.impl.EventDataDaoImpl.COLLECTION_PREFIX;
/**
* @ClassName: EventDaoImpl
* @Description 事件业务实现类
......@@ -32,4 +42,22 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
query.addCriteria(Criteria.where("yqEventId").is(yqEventId).and("projectId").is(projectId).and("linkedGroupId").is(linkedGroupId));
return mongoTemplate.findOne(query, Event.class);
}
@Override
public List<String> getEventCount(ChannelIndex channelIndex) {
// 添加渠道唯一标识
Criteria criteria = addChannelIndex(channelIndex);
// 分组
Aggregation agg = Aggregation.newAggregation(Aggregation.match(criteria), Aggregation.group("eventId").count().as("eventCount"));
AggregationResults<JSONObject> aggregate = mongoTemplate.aggregate(agg, getAggreeCollection(), JSONObject.class);
List<JSONObject> mappedResults = aggregate.getMappedResults();
return mappedResults.stream().map(json -> json.getString("_id")).collect(Collectors.toList());
}
private String getAggreeCollection() {
Calendar date = Calendar.getInstance();
int year = date.get(Calendar.YEAR);
return COLLECTION_PREFIX + "_" + year;
}
}
package com.zhiwei.brandkbs2.dao.impl;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.dao.EventDataDao;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.pojo.Event;
import com.zhiwei.brandkbs2.pojo.EventData;
import org.springframework.data.domain.Sort;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
/**
* @ClassName: EventDataDaoImpl
......@@ -26,7 +21,7 @@ import java.util.stream.Collectors;
@Component("eventDataDao")
public class EventDataDaoImpl extends BaseMongoDaoImpl<EventData> implements EventDataDao {
private static final String COLLECTION_PREFIX = "brandkbs_event_data";
protected static final String COLLECTION_PREFIX = "brandkbs_event_data";
private static final String TIME_PATTERN = "yyyy";
public EventDataDaoImpl() {
......@@ -53,17 +48,6 @@ public class EventDataDaoImpl extends BaseMongoDaoImpl<EventData> implements Eve
}
@Override
public List<String> getEventCount(ChannelIndex channelIndex) {
// 添加渠道唯一标识
Criteria criteria = addChannelIndex(channelIndex);
// 分组
Aggregation agg = Aggregation.newAggregation(Aggregation.match(criteria), Aggregation.group("eventId").count().as("eventCount"));
AggregationResults<JSONObject> aggregate = mongoTemplate.aggregate(agg, getAggreeCollection(), JSONObject.class);
List<JSONObject> mappedResults = aggregate.getMappedResults();
return mappedResults.stream().map(json -> json.getString("_id")).collect(Collectors.toList());
}
@Override
public long getEventArticleCount(Event event) {
return count(Query.query(Criteria.where("eventId").is(event.getId())), event.getCollectionName());
}
......@@ -91,10 +75,4 @@ public class EventDataDaoImpl extends BaseMongoDaoImpl<EventData> implements Eve
return mongoTemplate.count(query, EventData.class, collectionName);
}
private String getAggreeCollection() {
Calendar date = Calendar.getInstance();
int year = date.get(Calendar.YEAR);
return COLLECTION_PREFIX + "_" + year;
}
}
package com.zhiwei.brandkbs2.es;
import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.pojo.ChannelRecord;
import com.zhiwei.brandkbs2.service.CommonService;
import com.zhiwei.brandkbs2.util.Md5Util;
import org.apache.commons.collections4.ListUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @ClassName: ChannelEsClientDao
* @Description ChannelEsClientDao
* @author: sjj
* @date: 2022-07-26 17:25
*/
@Component("channelEsDao")
public class ChannelEsDao extends EsClientDao {
private static final Logger log = LogManager.getLogger(ChannelEsDao.class);
@Resource(name = "localEsClient")
RestHighLevelClient channelEsClient;
@Resource(name = "commonServiceImpl")
CommonService commonService;
public List<String> getArticleIds(String fid) {
Long[] timeRangeMonth = commonService.getTimeRangeMonth();
return getArticleIds(timeRangeMonth[0], timeRangeMonth[1], fid);
}
public List<String> getArticleIds(long startTime, long endTime, String fid) {
List<String> articleIds = new ArrayList<>();
try {
SearchHelper searchHelper = createSearchHelper();
BoolQueryBuilder postFilter = channelUniqueBool(fid);
postFilter.must(QueryBuilders.rangeQuery(ChannelRecord.RANGE_START_TIME).gte(startTime));
postFilter.must(QueryBuilders.rangeQuery(ChannelRecord.RANGE_END_TIME).lt(endTime));
searchHelper.setPostFilter(postFilter);
searchHelper.setSort(SortBuilders.fieldSort(ChannelRecord.RANGE_END_TIME).order(SortOrder.DESC));
SearchHits searchHits = retryTemplate.execute(context -> searchHits(searchHelper));
for (SearchHit hit : searchHits.getHits()) {
ChannelRecord channelRecord = new ChannelRecord(hit.getSourceAsMap());
channelRecord.getLinkMaps().stream().filter(linkMap -> fid.equals(linkMap.getIndex().getFid())).
forEach(linkMap -> articleIds.addAll(linkMap.getRecord().getArticleIds()));
}
} catch (IOException ignored) {
}
return articleIds;
}
public void upsertChannelRecord(List<ChannelRecord> channelRecords) {
String index = getChannelRecordIndex();
BulkRequest bulkRequest = new BulkRequest();
Long startTime = null;
Long endTime = null;
for (List<ChannelRecord> records : ListUtils.partition(channelRecords, 1000)) {
for (ChannelRecord record : records) {
startTime = null == startTime ? record.getRangeStartTime() : Math.min(startTime, record.getRangeStartTime());
endTime = null == endTime ? record.getRangeEndTime() : Math.max(endTime, record.getRangeEndTime());
bulkRequest.add(new IndexRequest(index).id(Md5Util.encode2EsId(record.getRangeStartTime(),
record.getRangeEndTime())).source(record.toEsMap()));
}
BulkResponse bulkResponse = retryTemplate.execute(context -> {
try {
return channelEsClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException ignored) {
return null;
}
});
if (null == bulkResponse) {
log.error("upsertRecord批量操作重试后失败,index:{},rangeTime:{}", index, startTime + "-" + endTime);
} else if (bulkResponse.hasFailures()) {
log.error("upsertRecord批量操作失败,index:{},rangeTime:{},startTime[es-batchUpsert]异常:{}", index, startTime + "-" + endTime,
bulkResponse.buildFailureMessage());
} else {
log.info("upsertRecord批量操作成功,rangeTime:{},入库:{}条", startTime + "-" + endTime, records.size());
}
}
}
@Override
public String[] getIndexes() {
return new String[]{getChannelRecordIndex()};
}
public String getChannelRecordIndex() {
// if (test) {
return GenericAttribute.ES_CHANNEL_INDEX_TEST;
// }
// // 近1年数据库 TODO
// Calendar date = Calendar.getInstance();
// int year = date.get(Calendar.YEAR);
// res.add(GenericAttribute.ES_CHANNEL_INDEX_TEST);
// return res.toArray(new String[0]);
}
private BoolQueryBuilder channelUniqueBool(String fid) {
BoolQueryBuilder postFilter = QueryBuilders.boolQuery();
return postFilter.must(QueryBuilders.termQuery(ChannelRecord.LINK_MAPS + ".fid.keyword", fid));
}
}
......@@ -8,6 +8,7 @@ import com.zhiwei.brandkbs2.util.Tools;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.ClearScrollRequest;
......@@ -58,16 +59,16 @@ public class EsClientDao {
private static final TimeValue TIME_VALUE = TimeValue.timeValueMinutes(8);
@Value("${es.index.test}")
private boolean test;
boolean test;
@Resource(name = "esClient")
private RestHighLevelClient esClient;
RestHighLevelClient esClient;
@Resource(name = "esSearchExecutor")
private ThreadPoolTaskExecutor executor;
ThreadPoolTaskExecutor executor;
@Resource(name = "retryTemplate")
private RetryTemplate retryTemplate;
RetryTemplate retryTemplate;
public Map<String, JSONObject> searchByIds(Collection<String> queryIds) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
......@@ -83,26 +84,18 @@ public class EsClientDao {
* @param day 几天前天数
* @return 渠道记录结果
*/
public Map<ChannelIndex, ChannelIndex.ChannelRecord> searchChannelRecordRecentDay(int day) {
Map<ChannelIndex, ChannelIndex.ChannelRecord> res = new HashMap<>();
public List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> searchRecordRecentDay(int day) {
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> res = new ArrayList<>();
Calendar calendar = Calendar.getInstance();
long endTime = calendar.getTime().getTime();
calendar.add(Calendar.DAY_OF_MONTH, -day);
long startTime = calendar.getTime().getTime();
List<Long[]> cutTimes = Tools.cutTimeRange(startTime, endTime, ONE_HOUR);
List<CompletableFuture<Map<ChannelIndex, ChannelIndex.ChannelRecord>>> futures = new ArrayList<>(cutTimes.size());
cutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> searchChannelRecord(times[0], times[1]), executor)));
List<CompletableFuture<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>>> futures = new ArrayList<>(cutTimes.size());
cutTimes.forEach(times -> futures.add(CompletableFuture.supplyAsync(() -> searchRecord(times[0], times[1]), executor)));
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).whenComplete((r, e) -> {
futures.forEach(f -> {
Map<ChannelIndex, ChannelIndex.ChannelRecord> channelIndexMap = f.join();
channelIndexMap.forEach((channelIndex, record) -> {
res.compute(channelIndex, (k, v) -> {
if (null == v) {
return record;
}
return v.mergeRecord(record);
});
});
res.add(f.join());
});
}).join();
return res;
......@@ -174,8 +167,8 @@ public class EsClientDao {
return retryTemplate.execute(context -> searchScroll(sourceBuilder));
}
private Map<ChannelIndex, ChannelIndex.ChannelRecord> searchChannelRecord(long startTime, long endTime) {
Map<ChannelIndex, ChannelIndex.ChannelRecord> res = new HashMap<>();
private Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>> searchRecord(long startTime, long endTime) {
Map<ChannelIndex, ChannelIndex.Record> res = new HashMap<>();
try {
QueryBuilder queryBuilder = QueryBuilders.rangeQuery("mtime").gte(startTime).lt(endTime);
List<Map<String, Object>> results = searchScroll(queryBuilder, 10000, CHANNEL_RECORD_FETCH_SOURCE);
......@@ -183,22 +176,22 @@ public class EsClientDao {
for (ChannelIndex channelIndex : ChannelIndex.createChannelIndexes(result)) {
res.compute(channelIndex, (k, v) -> {
if (null == v) {
v = new ChannelIndex.ChannelRecord();
v = new ChannelIndex.Record();
}
try {
return v.mergeRecord(new ChannelIndex.ChannelRecord((long) result.get(GenericAttribute.ES_TIME), String.valueOf(result.get("id"))));
return v.mergeRecord(new ChannelIndex.Record((long) result.get(GenericAttribute.ES_TIME), String.valueOf(result.get("id"))));
} catch (Exception e) {
log.error("searchChannelRecord-error-id:{}", result.get("id"), e);
log.error("searchRecord-error-id:{}", result.get("id"), e);
return null;
}
});
}
}
} catch (IOException e) {
log.error("searchChannelRecord-", e);
log.error("searchRecord-", e);
}
log.info("startTime:{},endTime:{},size:{}", DF.format(startTime), DF.format(endTime), res.size());
return res;
return Pair.of(new Long[]{startTime, endTime}, res);
}
public String[] getIndexes() {
......
......@@ -102,7 +102,7 @@ public class EsQueryTools {
for (String key : keys) {
String channelRegex = getAllRegex(key);
BoolQueryBuilder keyQueryBuilder = QueryBuilders.boolQuery();
keyQueryBuilder.must(QueryBuilders.regexpQuery("source",".*"+channelRegex+".*"));
keyQueryBuilder.must(QueryBuilders.regexpQuery("source.keyword",".*"+channelRegex+".*"));
channelBoolQueryBuilder.should(keyQueryBuilder);
}
return channelBoolQueryBuilder;
......
......@@ -84,4 +84,14 @@ public class EsRestClient {
return client;
}
@Bean(name = "localEsClient")
public RestHighLevelClient localEsClient() {
List<HttpHost> httpHostList = new ArrayList<>();
httpHostList.add(new HttpHost("192.168.0.250", 9200));
HttpHost[] httpHosts = httpHostList.toArray(new HttpHost[0]);
// 判断,如果未配置用户名,则进行无用户名密码连接,配置了用户名,则进行用户名密码连接
return new RestHighLevelClient(RestClient.builder(httpHosts));
}
}
......@@ -22,7 +22,7 @@ public class EsRetryTools {
SimpleRetryPolicy policy = new SimpleRetryPolicy();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
// 策略
policy.setMaxAttempts(2);// 重试次数
policy.setMaxAttempts(3);// 重试次数
template.setRetryPolicy(policy);
backOffPolicy.setInitialInterval(1000);// 初始休眠
backOffPolicy.setMultiplier(2);// 指定乘数 下次休眠 = multiplier * 当前休眠时间
......
......@@ -6,7 +6,6 @@ import lombok.Getter;
import lombok.Setter;
import java.util.Date;
import java.util.LinkedHashSet;
/**
* @ClassName: Channel
......@@ -29,19 +28,19 @@ public class Channel extends ChannelIndex {
private Long lastTime;
/**
* 图片地址
* 图片头像地址
*/
private String imgUrl;
private String avatarUrl;
/**
* 发布稿件id列表
* 发布稿件
*/
private LinkedHashSet<String> articleIds;
private long articleCount;
/**
* 参与事件id列表
* 参与事件
*/
private LinkedHashSet<String> eventIds;
private long eventCount;
/**
* 渠道倾向
......@@ -59,7 +58,7 @@ public class Channel extends ChannelIndex {
private String experienceLevel;
/**
* 是否屏蔽
* 是否展示
*/
private boolean show = true;
......@@ -69,24 +68,24 @@ public class Channel extends ChannelIndex {
}
}
public void setArticleIds(LinkedHashSet<String> articleIds) {
if (null == this.articleIds) {
this.articleIds = articleIds;
} else {
this.articleIds.addAll(articleIds);
public void increaseArticleCount() {
this.articleCount += 1;
}
public void increaseArticleCount(long articleCount) {
this.articleCount += articleCount;
}
public void setChannelRecord(ChannelRecord channelRecord) {
setLastTime(channelRecord.getLastTime());
setArticleIds(channelRecord.getArticleIds());
public void setRecord(Record record) {
setLastTime(record.getLastTime());
increaseArticleCount(record.getArticleIds().size());
}
public ChannelIndex getChannelIndex() {
return Tools.convertMap(this, ChannelIndex.class);
}
public static Channel createFromChannelIndexRecord(ChannelIndex channelIndex, ChannelRecord channelRecord) {
public static Channel createFromChannelIndexRecord(ChannelIndex channelIndex, Record record) {
Channel channel = new Channel();
channel.setCTime(new Date().getTime());
channel.setProjectId(channelIndex.getProjectId());
......@@ -97,8 +96,8 @@ public class Channel extends ChannelIndex {
// fid 用来便捷搜索
channel.setFid(channelIndex.getFid());
channel.setLastTime(channelRecord.getLastTime());
channel.setArticleIds(channelRecord.getArticleIds());
channel.setLastTime(record.getLastTime());
channel.setRecord(record);
// TODO 调性随机分配
double random = Math.random();
if (random < 0.3) {
......
......@@ -88,30 +88,51 @@ public class ChannelIndex extends AbstractBaseMongo {
return res;
}
/**
* 合并渠道记录
*
* @param channelIndexMaps
* @return
*/
public static Map<ChannelIndex, Record> mergeRecord(List<Map<ChannelIndex, Record>> channelIndexMaps) {
Map<ChannelIndex, Record> mergeRecord = new HashMap<>();
for (Map<ChannelIndex, Record> channelIndexMap : channelIndexMaps) {
channelIndexMap.forEach((channelIndex, record) -> {
mergeRecord.compute(channelIndex, (k, v) -> {
if (null == v) {
return record;
}
return v.mergeRecord(record);
});
});
}
return mergeRecord;
}
@Setter
@Getter
public static class ChannelRecord {
public static class Record {
private Long lastTime;
private LinkedHashSet<String> articleIds = new LinkedHashSet<>();
private List<String> articleIds = new ArrayList<>();
public ChannelRecord() {
public Record() {
}
public ChannelRecord(Long lastTime, String articleId) {
public Record(Long lastTime, String articleId) {
this.lastTime = lastTime;
this.articleIds.add(articleId);
}
private void setLastTime(Long lastTime) {
public void setLastTime(Long lastTime) {
if (null == this.lastTime || this.lastTime < lastTime) {
this.lastTime = lastTime;
}
}
public ChannelRecord mergeRecord(ChannelRecord channelRecord) {
this.articleIds.addAll(channelRecord.getArticleIds());
setLastTime(channelRecord.getLastTime());
public Record mergeRecord(Record record) {
this.articleIds.addAll(record.getArticleIds());
setLastTime(record.getLastTime());
return this;
}
......@@ -123,12 +144,13 @@ public class ChannelIndex extends AbstractBaseMongo {
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
ChannelIndex that = (ChannelIndex) o;
return Objects.equals(linkedGroupId, that.linkedGroupId) && Objects.equals(platform, that.platform) && Objects.equals(realSource, that.realSource) && Objects.equals(source, that.source);
return Objects.equals(projectId, that.projectId) && Objects.equals(linkedGroupId, that.linkedGroupId) && Objects.equals(platform, that.platform) && Objects.equals(realSource,
that.realSource) && Objects.equals(source, that.source);
}
@Override
public int hashCode() {
return Objects.hash(linkedGroupId, platform, realSource, source);
return Objects.hash(projectId, linkedGroupId, platform, realSource, source);
}
}
package com.zhiwei.brandkbs2.pojo;
import com.alibaba.fastjson.JSONObject;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @ClassName: ChannelRecord
* @Description 渠道记录实体
* @author: sjj
* @date: 2022-07-26 11:54
*/
@Setter
@Getter
public class ChannelRecord {
public static final String RANGE_START_TIME = "range_start_time";
public static final String RANGE_END_TIME = "range_end_time";
public static final String LINK_MAPS = "link_maps";
/**
* 区间内统计开始时间
*/
private long rangeStartTime;
/**
* 区间内统计结束时间
*/
private long rangeEndTime;
/**
* 关联表
*/
private List<LinkMap> linkMaps;
public ChannelRecord(long rangeStartTime, long rangeEndTime, @NonNull Map<ChannelIndex, ChannelIndex.Record> records) {
this.rangeStartTime = rangeStartTime;
this.rangeEndTime = rangeEndTime;
this.linkMaps = records.entrySet().stream().map(entry -> new LinkMap(entry.getKey(), entry.getValue())).collect(Collectors.toList());
}
public ChannelRecord(Map<String, Object> esMap) {
JSONObject json = new JSONObject(esMap);
this.rangeStartTime = json.getLongValue(RANGE_START_TIME);
this.rangeEndTime = json.getLongValue(RANGE_END_TIME);
List<LinkMap> list = new ArrayList<>();
List<Map<String, Object>> maps = (List<Map<String, Object>>) json.get(LINK_MAPS);
for (Map<String, Object> map : maps) {
// channelIndex
ChannelIndex channelIndex = new ChannelIndex();
channelIndex.setProjectId(map.get("project_id") + "");
channelIndex.setLinkedGroupId(map.get("linked_group_id") + "");
channelIndex.setPlatform(map.get("platform") + "");
channelIndex.setRealSource(map.get("real_source") + "");
channelIndex.setSource(map.get("source") + "");
Long lastTime = Long.valueOf(map.get("last_time") + "");
List<String> articleIds = (List<String>) map.get("article_ids");
// record
ChannelIndex.Record record = new ChannelIndex.Record();
record.setLastTime(lastTime);
record.setArticleIds(articleIds);
LinkMap linkMap = new LinkMap(channelIndex, record);
list.add(linkMap);
}
this.linkMaps = list;
}
public Map<String, Object> toEsMap() {
Map<String, Object> esMap = new HashMap<>();
esMap.put(RANGE_START_TIME, rangeStartTime);
esMap.put(RANGE_END_TIME, rangeEndTime);
esMap.put(LINK_MAPS, linkMaps.stream().map(LinkMap::toEsLinkMap).collect(Collectors.toList()));
return esMap;
}
@AllArgsConstructor
@Getter
public static class LinkMap {
private final ChannelIndex index;
private final ChannelIndex.Record record;
private Map<String, Object> toEsLinkMap() {
if (null == index || null == record) {
return null;
}
Map<String, Object> map = new HashMap<>();
map.put("project_id", index.getProjectId());
map.put("linked_group_id", index.getLinkedGroupId());
map.put("platform", index.getPlatform());
map.put("real_source", index.getRealSource());
map.put("source", index.getSource());
map.put("fid", index.getFid());
map.put("last_time", record.getLastTime());
map.put("article_ids", record.getArticleIds());
return map;
}
}
}
......@@ -10,6 +10,7 @@ import com.zhiwei.brandkbs2.easyexcel.dto.ExportAdminChannelEventDTO;
import com.zhiwei.brandkbs2.easyexcel.dto.ExportChannelDTO;
import com.zhiwei.brandkbs2.enmus.EventTagEnum;
import com.zhiwei.brandkbs2.enmus.ExperienceEnum;
import com.zhiwei.brandkbs2.es.ChannelEsDao;
import com.zhiwei.brandkbs2.es.EsClientDao;
import com.zhiwei.brandkbs2.exception.ExceptionCast;
import com.zhiwei.brandkbs2.model.CommonCodeEnum;
......@@ -62,6 +63,9 @@ public class ChannelServiceImpl implements ChannelService {
@Resource(name = "esClientDao")
EsClientDao esClientDao;
@Resource(name = "channelEsDao")
ChannelEsDao channelEsDao;
@Resource(name = "channelTagDao")
ChannelTagDao channelTagDao;
......@@ -87,14 +91,14 @@ public class ChannelServiceImpl implements ChannelService {
result.put("platform", channel.getPlatform());
result.put("realSource", channel.getRealSource());
result.put("source", channel.getSource());
result.put("articleCount", channel.getArticleIds().size());
result.put("eventCount", null == channel.getEventIds() ? 0 : channel.getEventIds().size());
result.put("articleCount", channel.getArticleCount());
result.put("eventCount", channel.getEventCount());
result.put("emotion", channel.getEmotion());
result.put("emotionIndex", channel.getEmotionIndex());
result.put("experienceLevel", ExperienceEnum.getValueFromDataBaseName(channel.getExperienceLevel()));
result.put("lastTime", channel.getLastTime());
result.put("show", channel.isShow());
result.put("imgUrl", channel.getImgUrl());
result.put("imgUrl", channel.getAvatarUrl());
result.put("tag", channelTagDao.getTagByChannelName(channel.getSource()));
return result;
}).collect(Collectors.toList());
......@@ -106,7 +110,7 @@ public class ChannelServiceImpl implements ChannelService {
public PageVO<JSONObject> findArticleList(int page, int size, String channelId) {
try {
Channel channel = channelDao.findOneById(channelId);
LinkedHashSet<String> articleIds = channel.getArticleIds();
List<String> articleIds = channelEsDao.getArticleIds(channel.getFid());
List<String> queryIds = Tools.pageList(page, size, articleIds);
List<JSONObject> resList = new ArrayList<>();
Map<String, JSONObject> idMaps = esClientDao.searchByIds(queryIds);
......@@ -132,7 +136,7 @@ public class ChannelServiceImpl implements ChannelService {
@Override
public PageVO<JSONObject> findEventList(int page, int size, String channelId) {
Channel channel = channelDao.findOneById(channelId);
LinkedHashSet<String> eventIds = channel.getEventIds();
List<String> eventIds = eventDao.getEventCount(channel.getChannelIndex());
Query query = Query.query(Criteria.where("_id").in(eventIds));
long total = eventDao.count(query);
mongoUtil.start(page, size, query);
......@@ -184,7 +188,7 @@ public class ChannelServiceImpl implements ChannelService {
jsonObject.put("platform", channel.getPlatform());
jsonObject.put("realSource", channel.getRealSource());
jsonObject.put("source", channel.getSource());
jsonObject.put("imgUrl", channel.getImgUrl());
jsonObject.put("imgUrl", channel.getAvatarUrl());
jsonObject.put("tag", channelTagDao.getTagByChannelName(channel.getSource()));
return jsonObject;
}
......@@ -210,7 +214,7 @@ public class ChannelServiceImpl implements ChannelService {
List<ExportAdminChannelArticleDTO> resList = new ArrayList<>();
try {
Channel channel = channelDao.findOneById(channelId);
LinkedHashSet<String> articleIds = channel.getArticleIds();
List<String> articleIds = channelEsDao.getArticleIds(channel.getFid());
Map<String, JSONObject> resultMap = esClientDao.searchByIds(articleIds);
resultMap.values().forEach(json -> {
// url,title,platform,forward,emotion 未配置
......@@ -229,7 +233,7 @@ public class ChannelServiceImpl implements ChannelService {
public List<ExportAdminChannelEventDTO> findDownloadChannelEventList(String channelId) {
List<ExportAdminChannelEventDTO> resList = new ArrayList<>();
Channel channel = channelDao.findOneById(channelId);
LinkedHashSet<String> eventIds = channel.getEventIds();
List<String> eventIds = eventDao.getEventCount(channel.getChannelIndex());
if (CollectionUtils.isEmpty(eventIds)) {
return resList;
}
......@@ -290,8 +294,6 @@ public class ChannelServiceImpl implements ChannelService {
TermsAggregationBuilder aggregationBuilder = AggregationBuilders.terms("count").field("mark_cache_maps.name.keyword");
return null;
}
......
......@@ -3,6 +3,7 @@ package com.zhiwei.brandkbs2.service.impl;
import com.zhiwei.brandkbs2.config.Constant;
import com.zhiwei.brandkbs2.dao.*;
import com.zhiwei.brandkbs2.enmus.ReportTypeEnum;
import com.zhiwei.brandkbs2.es.ChannelEsDao;
import com.zhiwei.brandkbs2.es.EsClientDao;
import com.zhiwei.brandkbs2.listener.ApplicationProjectListener;
import com.zhiwei.brandkbs2.pojo.*;
......@@ -17,12 +18,12 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* @ClassName: TaskServiceImpl
......@@ -37,6 +38,9 @@ public class TaskServiceImpl implements TaskService {
@Resource(name = "esClientDao")
EsClientDao esClientDao;
@Resource(name = "channelEsDao")
ChannelEsDao channelEsDao;
@Resource(name = "channelDao")
ChannelDao channelDao;
......@@ -63,27 +67,32 @@ public class TaskServiceImpl implements TaskService {
@Override
public void messageFlowCount(int day) {
Map<ChannelIndex, ChannelIndex.ChannelRecord> channelIndexChannelRecordMap = esClientDao.searchChannelRecordRecentDay(day);
log.info("渠道信息记录-搜索到近{}天的记录条数{}条", day, channelIndexChannelRecordMap.size());
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day);
int total = rangeTimeRecords.stream().mapToInt(pair -> pair.getRight().values().size()).sum();
log.info("渠道统计-搜索到近{}天的受影响渠道数{}条", day, total);
List<ChannelRecord> channelRecords = rangeTimeRecords.stream().map(pair -> new ChannelRecord(pair.getLeft()[0], pair.getLeft()[1], pair.getRight())).collect(Collectors.toList());
channelEsDao.upsertChannelRecord(channelRecords);
log.info("渠道统计-小时级渠道记录统计结束");
List<Channel> insertList = new ArrayList<>();
for (Map.Entry<ChannelIndex, ChannelIndex.ChannelRecord> entry : channelIndexChannelRecordMap.entrySet()) {
List<Map<ChannelIndex, ChannelIndex.Record>> channelList = rangeTimeRecords.stream().map(Pair::getRight).collect(Collectors.toList());
// 合并渠道记录
Map<ChannelIndex, ChannelIndex.Record> channelIndexRecordMap = ChannelIndex.mergeRecord(channelList);
for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : channelIndexRecordMap.entrySet()) {
// 是否已存在
Channel channel = channelDao.queryUnique(entry.getKey());
if (null == channel) {
channel = Channel.createFromChannelIndexRecord(entry.getKey(), entry.getValue());
channel.setEventIds(new LinkedHashSet<>(eventDataDao.getEventCount(entry.getKey())));
insertList.add(channel);
channelDao.insertOne(channel);
} else {
channel.setEventIds(new LinkedHashSet<>(eventDataDao.getEventCount(entry.getKey())));
channel.setChannelRecord(entry.getValue());
channel.setRecord(entry.getValue());
channelDao.updateOne(channel);
}
}
// ListUtils.partition(insertList, 1000).forEach(list -> {
// channelDao.insertMany(list);
// });
log.info("渠道信息记录-录入完毕,新增渠道{}条,更新渠道{}条", insertList.size(), channelIndexChannelRecordMap.size() - insertList.size());
log.info("渠道统计-录入完毕,新增渠道{}条,更新渠道{}条", insertList.size(), total - insertList.size());
}
@Override
......
package com.zhiwei.brandkbs2.util;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
......@@ -10,10 +14,61 @@ import java.security.NoSuchAlgorithmException;
* @date 2019-8-6 15:53
**/
public final class Md5Util {
private static final Logger logger = LogManager.getLogger(Md5Util.class);
private Md5Util() {
}
/**
* 输出MD5
*
* @param objs 需要加密的字符串
* @return esId
*/
public static String encode2EsId(Object... objs) {
StringBuilder sb = new StringBuilder();
for (Object obj : objs) {
sb.append(obj.toString());
}
return encode2EsId(sb.toString());
}
/**
* 输出MD5
*
* @param sourceStr 需要加密的字符串
* @return esId
*/
public static String encode2EsId(String sourceStr) {
String md5 = null;
try {
// 创建加密对象
MessageDigest md = MessageDigest.getInstance("MD5");
md5 = mdUpdate(sourceStr, md);
} catch (Exception e) {
logger.info("MD5加密出错", e);
}
return md5;
}
private static String mdUpdate(String sourceStr, MessageDigest md) {
// 计算MD5函数
md.update(sourceStr.getBytes(StandardCharsets.UTF_8));
byte[] b = md.digest();
int i;
StringBuilder buf = new StringBuilder();
for (byte value : b) {
i = value;
if (i < 0)
i += 256;
if (i < 16)
buf.append("0");
buf.append(Integer.toHexString(i));
}
return buf.toString();
}
/**
* 将明文密码转成MD5密码
*
* @param password 密码
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment