Commit 8d01723a by shenjunjie

渠道库部分完结

parent 34d25fdf
......@@ -16,7 +16,7 @@ public class GenericAttribute {
*/
public static final String ES_INDEX_PRE = "brandkbs2";
public static final String ES_INDEX_TEST = "brandkbs2_test";
public static final String ES_CHANNEL_INDEX_TEST = "brandkbs2_channel_record_test2";
public static final String ES_CHANNEL_INDEX_TEST = "brandkbs2_channel_record_test7";
/**
* es ind_title
**/
......
......@@ -11,9 +11,11 @@ import org.apache.commons.lang3.time.FastDateFormat;
public class Constant {
public static final Long ONE_DAY = 24 * 60 * 60 * 1000L;
public static final Long ONE_MONTH = 30 * 24 * 60 * 60 * 1000L;
public static final String HOUR_PATTERN = "yyyy-MM-dd HH";
public static final String DAY_PATTERN = "yyyy-MM-dd";
public static final String MONTH_PATTERN = "yyyy-MM";
public static final FastDateFormat HOUR_FORMAT = FastDateFormat.getInstance(HOUR_PATTERN);
public static final FastDateFormat DAY_FORMAT = FastDateFormat.getInstance(DAY_PATTERN);
......@@ -28,7 +30,7 @@ public class Constant {
*/
public static final String EMOTION_LABEL_KEY = "情感倾向";
public static final String EMOTION_SENSITIVE = "敏感";
public static final String EMOTION_NEGATIVE= "负面";
public static final String EMOTION_NEGATIVE = "负面";
public static final String BRAND_LABEL_KEY = "品牌归属";
/**
......
......@@ -61,4 +61,22 @@ public class TaskPoolConfig {
return executor;
}
@Bean
public ThreadPoolTaskExecutor mongoQueryExecutor() {
log.info("start mongoQueryExecutor");
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 配置核心线程数
executor.setCorePoolSize(8);
// 配置最大线程数
executor.setMaxPoolSize(16);
// 配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("mongoQuery-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 执行初始化
executor.initialize();
return executor;
}
}
......@@ -3,16 +3,21 @@ package com.zhiwei.brandkbs2.dao;
import com.zhiwei.brandkbs2.pojo.Channel;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import java.util.Collection;
import java.util.Map;
/**
* @ClassName: ChannelDao
* @Description ChannelDao
* @author: sjj
* @date: 2022-06-16 15:30
*/
public interface ChannelDao extends BaseMongoDao<Channel>{
public interface ChannelDao extends BaseMongoDao<Channel> {
Channel queryUnique(String channelFid);
Channel queryUnique(ChannelIndex channelIndex);
Map<String, Channel> queryUniqueAsync(Collection<String> channelFids);
}
......@@ -4,6 +4,7 @@ import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.pojo.Event;
import java.util.List;
import java.util.Map;
/**
* @ClassName: EventDao
......@@ -11,13 +12,13 @@ import java.util.List;
* @author: sjj
* @date: 2022-05-18 14:35
*/
public interface EventDao extends BaseMongoDao<Event>{
public interface EventDao extends BaseMongoDao<Event> {
/**
* 是否已存在事件
*
* @param yqEventId 舆情事件id
* @param projectId 项目id
* @param yqEventId 舆情事件id
* @param projectId 项目id
* @param linkedGroupId 关联项目组id
* @return 是否存在
*/
......@@ -26,12 +27,12 @@ public interface EventDao extends BaseMongoDao<Event>{
/**
* 根据联合id查询事件
*
* @param yqEventId 舆情事件id
* @param projectId 项目id
* @param yqEventId 舆情事件id
* @param projectId 项目id
* @param linkedGroupId 关联项目id
* @return 事件
*/
Event getEventByUniqueIds(String yqEventId,String projectId,String linkedGroupId);
Event getEventByUniqueIds(String yqEventId, String projectId, String linkedGroupId);
/**
......@@ -42,4 +43,21 @@ public interface EventDao extends BaseMongoDao<Event>{
*/
List<String> getEventCount(ChannelIndex channelIndex);
/**
* 获取参与事件数
*
* @param channelIndex 渠道标识
* @return 参与事件数
*/
Map<Long, List<Event>> getEventDay(ChannelIndex channelIndex, Long startTime, Long endTime);
/**
* 获取参与事件数
*
* @param channelIndex 渠道标识
* @return 参与事件数
*/
Map<Long, List<Event>> getEventMonth(ChannelIndex channelIndex, Long startTime, Long endTime);
}
......@@ -3,10 +3,18 @@ package com.zhiwei.brandkbs2.dao.impl;
import com.zhiwei.brandkbs2.dao.ChannelDao;
import com.zhiwei.brandkbs2.pojo.Channel;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.util.*;
import java.util.concurrent.CompletableFuture;
/**
* @ClassName: ChannelDaoImpl
* @Description ChannelDaoImpl
......@@ -15,11 +23,14 @@ import org.springframework.stereotype.Component;
*/
@Component("channelDao")
public class ChannelDaoImpl extends BaseMongoDaoImpl<Channel> implements ChannelDao {
private static final Logger log = LogManager.getLogger(ChannelDaoImpl.class);
private static final String COLLECTION_PREFIX = "brandkbs_channel";
private final ThreadPoolTaskExecutor executor;
public ChannelDaoImpl() {
public ChannelDaoImpl(@Qualifier("mongoQueryExecutor") ThreadPoolTaskExecutor executor) {
super(COLLECTION_PREFIX);
this.executor = executor;
}
@Override
......@@ -41,4 +52,24 @@ public class ChannelDaoImpl extends BaseMongoDaoImpl<Channel> implements Channel
return queryUnique(channelIndex.getFid());
}
@Override
public Map<String, Channel> queryUniqueAsync(Collection<String> channelFids) {
Map<String, Channel> res = new HashMap<>();
List<CompletableFuture<Pair<String, Channel>>> futureList = new ArrayList<>();
for (String fid : channelFids) {
futureList.add(CompletableFuture.supplyAsync(() -> Pair.of(fid, queryUnique(fid)), executor));
}
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((r, e) -> {
for (CompletableFuture<Pair<String, Channel>> future : futureList) {
try {
Pair<String, Channel> channelPair = future.get();
res.put(channelPair.getLeft(), channelPair.getRight());
} catch (Exception e1) {
log.error("queryUniqueAsync", e1);
}
}
}).join();
return res;
}
}
......@@ -11,7 +11,9 @@ import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static com.zhiwei.brandkbs2.dao.impl.EventDataDaoImpl.COLLECTION_PREFIX;
......@@ -54,6 +56,29 @@ public class EventDaoImpl extends BaseMongoDaoImpl<Event> implements EventDao {
return mappedResults.stream().map(json -> json.getString("_id")).collect(Collectors.toList());
}
@Override
public Map<Long, List<Event>> getEventDay(ChannelIndex channelIndex, Long startTime, Long endTime) {
return getEventTimePattern(channelIndex, startTime, endTime, 8);
}
@Override
public Map<Long, List<Event>> getEventMonth(ChannelIndex channelIndex, Long startTime, Long endTime) {
return getEventTimePattern(channelIndex, startTime, endTime, 6);
}
private Map<Long, List<Event>> getEventTimePattern(ChannelIndex channelIndex, Long startTime, Long endTime, int nrOfChars) {
// 添加渠道唯一标识
Criteria criteria = addChannelIndex(channelIndex);
// 分组
Aggregation agg = Aggregation.newAggregation(Aggregation.match(criteria),
// 截取前8位,按日分组
Aggregation.project("startTime", "eventId", "emotion").andExpression("add(new java.util.Date(0l),startTime)").substring(0, nrOfChars).as("patternDate"),
Aggregation.group("patternDate").count().as("eventCount"));
AggregationResults<JSONObject> aggregate = mongoTemplate.aggregate(agg, getAggreeCollection(), JSONObject.class);
List<JSONObject> mappedResults = aggregate.getMappedResults();
return new HashMap<>();
}
private String getAggreeCollection() {
Calendar date = Calendar.getInstance();
int year = date.get(Calendar.YEAR);
......
package com.zhiwei.brandkbs2.easyexcel.dto;
import com.alibaba.excel.annotation.ExcelProperty;
import com.zhiwei.brandkbs2.enmus.EmotionEnum;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import lombok.Data;
import lombok.ToString;
import java.util.Date;
/**
* @author lxj
* @version 1.0
* @description 前台导出渠道稿件实体类
* @date 2020/3/30 16:31
*/
@Data
@ToString
public class ExportAppChannelArticleDTO {
@ExcelProperty("时间")
private Date time;
@ExcelProperty("链接")
private String url;
@ExcelProperty("标题")
private String title;
@ExcelProperty("正文")
private String content;
@ExcelProperty("情感倾向")
private String emotion;
public static ExportAppChannelArticleDTO createFromArticle(ChannelIndex.Article article, String url, String title) {
ExportAppChannelArticleDTO dto = new ExportAppChannelArticleDTO();
dto.setTime(new Date(article.getTime()));
dto.setUrl(url);
dto.setTitle(title);
dto.setEmotion(EmotionEnum.state2Name(article.getEmotion()));
return dto;
}
}
......@@ -121,4 +121,13 @@ public enum EmotionEnum {
return 0;
}
public static String state2Name(int state) {
for (EmotionEnum value : EmotionEnum.values()) {
if (value.getState() == state) {
return value.getName();
}
}
return EmotionEnum.UNDEFINED.getName();
}
}
......@@ -2,7 +2,6 @@ package com.zhiwei.brandkbs2.es;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.enmus.EmotionEnum;
import com.zhiwei.brandkbs2.pojo.ChannelIndex;
import com.zhiwei.brandkbs2.pojo.Event;
import com.zhiwei.brandkbs2.util.Tools;
......@@ -52,7 +51,8 @@ import java.util.stream.Collectors;
public class EsClientDao {
private static final Logger log = LogManager.getLogger(EsClientDao.class);
protected static final FastDateFormat DF = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
private static final String[] CHANNEL_RECORD_FETCH_SOURCE = new String[]{"id", "c5", "foreign", "real_source", "source", "mtime", "time", "brandkbs_cache_maps"};
private static final String[] CHANNEL_RECORD_FETCH_SOURCE = new String[]{"id", "c5", "foreign", "real_source", "source", "mtime", "time",
"brandkbs_cache_maps", "brandkbs_mark_cache_maps"};
private static final String[] EVENT_FETCH_SOURCE = new String[]{"ind_full_text", "c5", "real_source", "source", "mtime", "time", "url", "mtag"};
private static final Long ONE_HOUR = 60 * 60 * 1000L;
......@@ -71,6 +71,10 @@ public class EsClientDao {
@Resource(name = "retryTemplate")
RetryTemplate retryTemplate;
public JSONObject searchById(String queryId) throws IOException {
return searchByIds(Collections.singleton(queryId)).get(queryId);
}
public Map<String, JSONObject> searchByIds(Collection<String> queryIds) throws IOException {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
QueryBuilder queryBuilder = QueryBuilders.idsQuery().addIds(queryIds.toArray(new String[0]));
......@@ -180,8 +184,7 @@ public class EsClientDao {
v = new ChannelIndex.Record();
}
try {
return v.mergeRecord(new ChannelIndex.Record((long) result.get(GenericAttribute.ES_TIME), (long) result.get(GenericAttribute.ES_MTIME),
String.valueOf(result.get("id")), EmotionEnum.parseFromName(Tools.getEmotion(result))));
return v.mergeRecord(new ChannelIndex.Record(result));
} catch (Exception e) {
log.error("searchRecord-error-id:{}", result.get("id"), e);
return null;
......
......@@ -124,12 +124,12 @@ public class EsQueryTools {
public static void assembleContendsQuery(BoolQueryBuilder query, Collection<String> contends) {
BoolQueryBuilder contendQuery = QueryBuilders.boolQuery();
// 主品牌一定参与
contendQuery.should(QueryBuilders.termQuery("contend_id", "0"));
contendQuery.should(QueryBuilders.termQuery("contend_id.keyword", "0"));
if (null == contends) {
return;
}
for (String contendId : contends) {
contendQuery.should((QueryBuilders.termQuery("contend_id", contendId)));
contendQuery.should((QueryBuilders.termQuery("contend_id.keyword", contendId)));
}
query.must(contendQuery);
}
......
......@@ -4,6 +4,7 @@ import com.zhiwei.brandkbs2.enmus.ChannelEmotion;
import com.zhiwei.brandkbs2.util.Tools;
import lombok.Getter;
import lombok.Setter;
import org.bson.types.ObjectId;
import java.util.Date;
......@@ -43,16 +44,6 @@ public class Channel extends ChannelIndex {
private long eventCount;
/**
* 渠道倾向
*/
private int emotion;
/**
* 渠道指数
*/
private Double emotionIndex = 0d;
/**
* 经验评级
*/
private String experienceLevel;
......@@ -97,6 +88,7 @@ public class Channel extends ChannelIndex {
public static Channel createFromChannelIndexRecord(ChannelIndex channelIndex, Record record) {
Channel channel = new Channel();
channel.setId(ObjectId.get().toString());
channel.setCTime(new Date().getTime());
channel.setProjectId(channelIndex.getProjectId());
channel.setLinkedGroupId(channelIndex.getLinkedGroupId());
......@@ -117,6 +109,8 @@ public class Channel extends ChannelIndex {
} else {
channel.setEmotion(ChannelEmotion.NEUTRAL.getState());
}
// TODO 情感指数随机分配
channel.setEmotionIndex(Math.random() * 100);
return channel;
}
......
......@@ -2,6 +2,7 @@ package com.zhiwei.brandkbs2.pojo;
import com.zhiwei.brandkbs2.common.GenericAttribute;
import com.zhiwei.brandkbs2.common.GlobalPojo;
import com.zhiwei.brandkbs2.enmus.EmotionEnum;
import com.zhiwei.brandkbs2.util.Tools;
import com.zhiwei.qbjc.bean.pojo.common.MessagePlatform;
import com.zhiwei.qbjc.bean.tools.BeanTools;
......@@ -54,6 +55,17 @@ public class ChannelIndex extends AbstractBaseMongo {
private String fid;
private int emotion;
/**
* 情感指数
*/
private Double emotionIndex;
public ChannelIndex(String projectId, String linkedGroupId, Channel channel) {
this(projectId, linkedGroupId, channel.getPlatform(), channel.getRealSource(), channel.getSource());
}
public ChannelIndex(String projectId, String linkedGroupId, String platform, String realSource, String source) {
this.projectId = projectId;
this.linkedGroupId = linkedGroupId;
......@@ -117,6 +129,12 @@ public class ChannelIndex extends AbstractBaseMongo {
return mergeRecord;
}
public void setChannelInfo(Channel channel) {
this.id = channel.getId();
this.emotion = channel.getEmotion();
this.emotionIndex = channel.getEmotionIndex();
}
@Setter
@Getter
@AllArgsConstructor
......@@ -128,9 +146,9 @@ public class ChannelIndex extends AbstractBaseMongo {
public Record() {
}
public Record(Long lastTime, Long mtime, String articleId, int emotion) {
this.lastTime = lastTime;
this.articles.add(new Article(lastTime, mtime, articleId, emotion));
public Record(Map<String, Object> esMap) {
this.lastTime = Long.parseLong(esMap.get("time") + "");
this.articles.add(Article.fromEsMap(esMap));
}
public void setLastTime(Long lastTime) {
......@@ -145,6 +163,33 @@ public class ChannelIndex extends AbstractBaseMongo {
return this;
}
public static List<Article> sortArticles(List<Article> articles) {
articles.sort(Comparator.comparingLong(ChannelIndex.Article::getTime));
return articles;
}
public static List<Article> filterArticles(Long startTime, Long endTime, List<Article> articles) {
// 去除不符合时间段数据
articles = articles.stream().filter(article -> Tools.hitTimeRange(startTime, endTime, article.getTime())).collect(Collectors.toList());
// 去重并保留最近标注时间
Map<String, ChannelIndex.Article> setMap = new HashMap<>();
for (ChannelIndex.Article article : articles) {
setMap.compute(article.getId(), (k, v) -> {
// 旧值为null或标注时间更新
if (null == v || article.getMtime() > v.getMtime()) {
return article;
}
return v;
});
}
return new ArrayList<>(setMap.values());
}
public static List<Article> filterSortArticles(Long startTime, Long endTime, List<Article> articles) {
return sortArticles(filterArticles(startTime, endTime, articles));
}
public Map<String, Object> toEsMap() {
Map<String, Object> map = new HashMap<>();
map.put("last_time", lastTime);
......@@ -160,11 +205,29 @@ public class ChannelIndex extends AbstractBaseMongo {
Long time;
Long mtime;
String id;
// String url;
// String title;
int emotion;
public static Article fromRecordMap(Map<String, Object> recordMap) {
Long time = Long.parseLong(recordMap.get("time") + "");
Long mtime = Long.parseLong(recordMap.get("mtime") + "");
String id = String.valueOf(recordMap.get("id"));
// String url = recordMap.get("url") + "";
// String title = recordMap.get("title") + "";
int emotion = Integer.parseInt(recordMap.get("emotion") + "");
return new Article(time, mtime, id, emotion);
}
public static Article fromEsMap(Map<String, Object> esMap) {
return new Article(Long.parseLong(esMap.get("time") + ""), Long.parseLong(esMap.get("mtime") + ""), String.valueOf(esMap.get("id")), Integer.parseInt(
esMap.get("emotion") + ""));
BaseMap baseMap = Tools.getBaseFromEsMap(esMap);
Long time = baseMap.getTime();
Long mtime = Long.parseLong(esMap.get("mtime") + "");
String id = String.valueOf(esMap.get("id"));
// String url = baseMap.getUrl();
// String title = baseMap.getTitle();
String emotionStr = baseMap.getEmotion();
return new Article(time, mtime, id, EmotionEnum.parseFromName(emotionStr));
}
public Map<String, Object> toEsMap() {
......@@ -172,6 +235,8 @@ public class ChannelIndex extends AbstractBaseMongo {
map.put("time", time);
map.put("mtime", mtime);
map.put("id", id);
// map.put("url", url);
// map.put("title", title);
map.put("emotion", emotion);
return map;
}
......
......@@ -34,7 +34,10 @@ public class ChannelRecord {
public static final String CHANNEL_FID = "channel_fid";
public static final String RECORD = "record";
public static final String ARTICLE_COUNT = "article_count";
public static final String EMOTION = "emotion";
public static final String EMOTION_INDEX = "emotion_index";
public static final String SHARDS = "shards";
private static final Integer SHARDS_LIMIT = 50;
/**
......@@ -82,6 +85,21 @@ public class ChannelRecord {
private String channelFid;
/**
* 渠道id
*/
private String channelId;
/**
* 情感倾向
*/
private int emotion;
/**
* 情感指数
*/
private Double emotionIndex;
/**
* 时间段内唯一id
*/
private String key;
......@@ -89,7 +107,7 @@ public class ChannelRecord {
/**
* articleCount
*/
private int articleCount;
private long articleCount;
/**
* 记录信息
*/
......@@ -110,6 +128,9 @@ public class ChannelRecord {
this.realSource = channelIndex.getRealSource();
this.source = channelIndex.getSource();
this.channelFid = channelIndex.getFid();
this.emotion = channelIndex.getEmotion();
this.emotionIndex = channelIndex.getEmotionIndex();
this.channelId = channelIndex.getId();
this.key = Tools.concat(channelFid, contendId);
this.articleCount = record.getArticles().size();
this.record = record;
......@@ -125,7 +146,7 @@ public class ChannelRecord {
List<ChannelIndex.Article> articles = templateRecord.getArticles();
int shards = 0;
// 防止数据量过大无法存储,故按100 拆分
for (List<ChannelIndex.Article> partList : Lists.partition(articles, 100)) {
for (List<ChannelIndex.Article> partList : Lists.partition(articles, SHARDS_LIMIT)) {
ChannelRecord channelRecord = new ChannelRecord(rangeStartTime, rangeEndTime, entry.getKey(), entry.getValue());
// 保留最近发文时间,更新partList,articleCount和shards
channelRecord.setRecord(new ChannelIndex.Record(lastTime, partList));
......@@ -149,6 +170,9 @@ public class ChannelRecord {
this.contendId = json.getString("contend_id");
this.channelFid = json.getString("channel_fid");
this.key = json.getString("key");
this.emotion = json.getIntValue("emotion");
this.emotionIndex = json.getDoubleValue("emotion_index");
this.channelId = json.getString("channel_id");
this.articleCount = json.getIntValue("article_count");
this.shards = json.getIntValue("shards");
......@@ -157,7 +181,7 @@ public class ChannelRecord {
ChannelIndex.Record record = new ChannelIndex.Record();
record.setLastTime(Long.valueOf(map.get("last_time") + ""));
List<Map<String, Object>> list = (List<Map<String, Object>>) map.get("articles");
record.setArticles(list.stream().map(ChannelIndex.Article::fromEsMap).collect(Collectors.toList()));
record.setArticles(list.stream().map(ChannelIndex.Article::fromRecordMap).collect(Collectors.toList()));
this.record = record;
}
......@@ -180,10 +204,19 @@ public class ChannelRecord {
esMap.put(CHANNEL_FID, channelFid);
esMap.put(RECORD, record.toEsMap());
esMap.put(ARTICLE_COUNT, articleCount);
esMap.put(EMOTION, emotion);
esMap.put(EMOTION_INDEX, emotionIndex);
esMap.put("channel_id", channelId);
esMap.put(SHARDS, shards);
return esMap;
}
public void setChannelInfo(ChannelRecord channelRecord) {
this.channelId = channelRecord.getChannelId();
this.emotion = channelRecord.getEmotion();
this.emotionIndex = channelRecord.getEmotionIndex();
}
@Deprecated
public static List<ChannelRecord> createChannelRecords(long rangeStartTime, long rangeEndTime, @NonNull Map<ChannelIndex, ChannelIndex.Record> records) {
// 通过platform+realSource+source 合并数据
......
......@@ -2,6 +2,7 @@ package com.zhiwei.brandkbs2.pojo.vo;
import com.zhiwei.brandkbs2.enmus.ChannelEmotion;
import com.zhiwei.brandkbs2.pojo.Channel;
import com.zhiwei.brandkbs2.pojo.ChannelRecord;
import com.zhiwei.brandkbs2.util.Tools;
import lombok.Data;
import lombok.ToString;
......@@ -36,6 +37,11 @@ public class ChannelListVO {
* 发文数
*/
private long articleCount;
/**
* 参与事件数
*/
private long eventCount;
/**
* 情感倾向
*/
......@@ -45,6 +51,8 @@ public class ChannelListVO {
*/
private String avatarUrl;
private Boolean isCollect;
public static ChannelListVO createFromChannel(String channelFid, long articleCount) {
ChannelListVO channelListVO = new ChannelListVO();
// projectId, linkedGroupId, platform, realSource, source
......@@ -64,4 +72,13 @@ public class ChannelListVO {
channelListVO.setArticleCount(articleCount);
return channelListVO;
}
public static ChannelListVO createFromChannel(ChannelRecord channelRecord, long articleCount) {
ChannelListVO channelListVO = Tools.convertMap(channelRecord, ChannelListVO.class);
String name = ChannelEmotion.getNameFromState(channelRecord.getEmotion());
channelListVO.setShowEmotion(name);
channelListVO.setArticleCount(articleCount);
return channelListVO;
}
}
......@@ -3,6 +3,7 @@ package com.zhiwei.brandkbs2.service;
import com.alibaba.fastjson.JSONObject;
import com.zhiwei.brandkbs2.easyexcel.dto.ExportAdminChannelArticleDTO;
import com.zhiwei.brandkbs2.easyexcel.dto.ExportAdminChannelEventDTO;
import com.zhiwei.brandkbs2.easyexcel.dto.ExportAppChannelArticleDTO;
import com.zhiwei.brandkbs2.easyexcel.dto.ExportChannelDTO;
import com.zhiwei.brandkbs2.pojo.dto.ChannelDTO;
import com.zhiwei.brandkbs2.pojo.vo.ChannelListVO;
......@@ -131,7 +132,7 @@ public interface ChannelService {
* @param size
* @return
*/
List<ChannelListVO> getPositiveList(String contendId, String platform, String keyword, String sorter, int size);
List<ChannelListVO> getPositiveList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size);
/**
* 获取敏感渠道榜
......@@ -143,7 +144,7 @@ public interface ChannelService {
* @param size
* @return
*/
List<ChannelListVO> getNegativeList(String contendId, String platform, String keyword, String sorter, int size);
List<ChannelListVO> getNegativeList(String contendId, String platform, String keyword, String sorter, Long startTime, Long endTime, int size);
/**
* 收藏渠道
......@@ -183,4 +184,42 @@ public interface ChannelService {
* @return 渠道动向
*/
JSONObject getSpreadingTend(String channelId, String type, Set<String> contends, Long startTime, Long endTime);
/**
* 获取渠道动向-摘要
*
* @param channelId 渠道ID
* @param contends 品牌ID集合
* @param startTime 开始时间
* @param endTime 结束时间
* @return 渠道动向-摘要
*/
JSONObject getSpreadingTendSummary(String channelId, Set<String> contends, Long startTime, Long endTime);
/**
* 获取时间段稿件信息
*
* @param startTime 开始时间时间戳
* @param endTime 结束时间
* @param page 页码
* @param pageSize 页码大小
* @param channelId 渠道ID
* @param contendId 竞品ID
* @return 稿件信息
*/
JSONObject getArticlesByTime(Long startTime, Long endTime, int page, int pageSize, String channelId, String contendId);
/**
* 下载时间段稿件信息
*
* @param startTime 开始时间时间戳
* @param endTime 结束时间
* @param page 页码
* @param pageSize 页码大小
* @param channelId 渠道ID
* @param contendId 竞品ID
* @return 稿件信息
*/
List<ExportAppChannelArticleDTO> downloadArticlesByTime(Long startTime, Long endTime,String channelId, String contendId);
}
......@@ -20,6 +20,7 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
......@@ -70,6 +71,64 @@ public class TaskServiceImpl implements TaskService {
@Override
public void messageFlowCount(int day) {
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day);
// 结果合并
List<Map<ChannelIndex, ChannelIndex.Record>> channelList = rangeTimeRecords.stream().map(Pair::getRight).collect(Collectors.toList());
// 合并渠道记录
Map<ChannelIndex, ChannelIndex.Record> channelIndexRecordMap = ChannelIndex.mergeRecord(channelList);
log.info("渠道统计-搜索到近{}天的受影响渠道数{}条", day, channelIndexRecordMap.size());
long handleSize = 0;
List<Channel> insertList = new ArrayList<>();
List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList = new ArrayList<>();
// 新recordMap
Map<ChannelIndex, ChannelIndex.Record> newRecordMap = new HashMap<>();
for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : channelIndexRecordMap.entrySet()) {
batchList.add(entry);
// 每100条做一次清算
if (++handleSize % 100 == 0 || handleSize == channelIndexRecordMap.size()) {
insertList.addAll(batchHandle(batchList, newRecordMap));
batchList = new ArrayList<>();
}
if (handleSize % 10000 == 0) {
log.info("渠道统计-渠道总计-查询更新已完成{}/{}", handleSize, channelIndexRecordMap.size());
}
}
// 替换成新的记录map
channelIndexRecordMap = newRecordMap;
ListUtils.partition(insertList, 1000).forEach(list -> {
channelDao.insertMany(list);
});
log.info("渠道统计-渠道总计-录入完毕,新增渠道{}条,更新渠道{}条", insertList.size(), channelIndexRecordMap.size() - insertList.size());
// 获得单位时间内最小最大时间戳
Long[] timeMinMax = Tools.timeMinMax(rangeTimeRecords.stream().map(Pair::getLeft).collect(Collectors.toList()));
List<ChannelRecord> channelRecords = ChannelRecord.createChannelRecords(timeMinMax[0], timeMinMax[1], channelIndexRecordMap);
channelEsDao.upsertChannelRecord(channelRecords);
log.info("渠道统计-渠道记录-统计结束");
}
private List<Channel> batchHandle(List<Map.Entry<ChannelIndex, ChannelIndex.Record>> batchList, Map<ChannelIndex, ChannelIndex.Record> newRecordMap) {
List<Channel> insertList = new ArrayList<>();
List<String> fids = batchList.stream().map(channelIndexRecordEntry -> channelIndexRecordEntry.getKey().getFid()).collect(Collectors.toList());
Map<String, Channel> fidChannel = channelDao.queryUniqueAsync(fids);
for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : batchList) {
String fid = entry.getKey().getFid();
Channel channel = fidChannel.get(fid);
if (null == channel) {
channel = Channel.createFromChannelIndexRecord(entry.getKey(), entry.getValue());
insertList.add(channel);
} else {
channel.setRecord(entry.getValue());
channelDao.updateOne(channel);
}
// 设置查询数值
entry.getKey().setChannelInfo(channel);
newRecordMap.put(entry.getKey(), entry.getValue());
}
return insertList;
}
@Deprecated
public void messageFlowCount2(int day) {
List<Pair<Long[], Map<ChannelIndex, ChannelIndex.Record>>> rangeTimeRecords = esClientDao.searchRecordRecentDay(day);
int total = rangeTimeRecords.stream().mapToInt(pair -> pair.getRight().values().size()).sum();
log.info("渠道统计-搜索到近{}天的受影响渠道数{}条", day, total);
// 结果合并
......@@ -79,9 +138,11 @@ public class TaskServiceImpl implements TaskService {
// 获得单位时间内最小最大时间戳
Long[] timeMinMax = Tools.timeMinMax(rangeTimeRecords.stream().map(Pair::getLeft).collect(Collectors.toList()));
List<ChannelRecord> channelRecords = ChannelRecord.createChannelRecords(timeMinMax[0], timeMinMax[1], channelIndexRecordMap);
// List<ChannelRecord> channelRecords = rangeTimeRecords.stream().map(pair -> ChannelRecord.createChannelRecords(pair.getLeft()[0], pair.getLeft()[1], pair.getRight())).flatMap(Collection::stream).collect(Collectors.toList());
channelEsDao.upsertChannelRecord(channelRecords);
log.info("渠道统计-小时级渠道记录-统计结束");
log.info("渠道统计-渠道记录-统计结束");
// List<ChannelRecord> channelRecords = rangeTimeRecords.stream().map(pair -> ChannelRecord.createChannelRecords(pair.getLeft()[0], pair.getLeft()[1], pair.getRight())).flatMap(Collection::stream).collect(Collectors.toList());
long handleSize = 0;
List<Channel> insertList = new ArrayList<>();
for (Map.Entry<ChannelIndex, ChannelIndex.Record> entry : channelIndexRecordMap.entrySet()) {
// 是否已存在
......@@ -93,8 +154,13 @@ public class TaskServiceImpl implements TaskService {
channel.setRecord(entry.getValue());
channelDao.updateOne(channel);
}
// 设置查询数值
entry.getKey().setChannelInfo(channel);
if (++handleSize % 10000 == 0) {
log.info("渠道统计-渠道总计-查询更新已完成{}/{}", handleSize, channelIndexRecordMap.size());
}
}
log.info("渠道统计-渠道总计-查询更新结束,开始批量入库");
log.info("渠道统计-渠道总计-查询更新结束,开始批量入库");
ListUtils.partition(insertList, 1000).forEach(list -> {
channelDao.insertMany(list);
});
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- @version $Id: applicationContext.xml 561608 2007-08-01 00:33:12Z vgritsenko $ -->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:configurator="http://cocoon.apache.org/schema/configurator"
xmlns:avalon="http://cocoon.apache.org/schema/avalon"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.0.xsd
http://cocoon.apache.org/schema/configurator http://cocoon.apache.org/schema/configurator/cocoon-configurator-1.0.1.xsd
http://cocoon.apache.org/schema/avalon http://cocoon.apache.org/schema/avalon/cocoon-avalon-1.0.xsd">
<!-- Activate Cocoon Spring Configurator -->
<configurator:settings/>
<!-- Configure Log4j -->
<bean name="org.apache.cocoon.spring.configurator.log4j"
class="org.apache.cocoon.spring.configurator.log4j.Log4JConfigurator"
scope="singleton">
<property name="settings" ref="org.apache.cocoon.configuration.Settings"/>
<property name="resource" value="/WEB-INF/log4j.xml"/>
</bean>
<!-- Activate Avalon Bridge -->
<avalon:bridge/>
</beans>
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
<!--
- This is a sample configuration for log4j.
- It simply just logs everything into a single log file.
- Note, that you can use properties for value substitution.
-->
<appender name="CORE" class="org.apache.log4j.FileAppender">
<param name="File" value="${org.apache.cocoon.work.directory}/cocoon-logs/log4j.log" />
<param name="Append" value="false" />
<layout class="org.apache.log4j.PatternLayout">
<param name="ConversionPattern" value="%d %-5p %t %c - %m%n"/>
</layout>
</appender>
<root>
<priority value="${org.apache.cocoon.log4j.loglevel}"/>
<appender-ref ref="CORE"/>
</root>
</log4j:configuration>
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
- This is the Cocoon web-app configurations file
-
- $Id$
-->
<web-app version="2.4"
xmlns="http://java.sun.com/xml/ns/j2ee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_4.xsd">
<!-- Servlet Filters ================================================ -->
<!--
- Declare a filter for multipart MIME handling
-->
<filter>
<description>Multipart MIME handling filter for Cocoon</description>
<display-name>Cocoon multipart filter</display-name>
<filter-name>CocoonMultipartFilter</filter-name>
<filter-class>org.apache.cocoon.servlet.multipart.MultipartFilter</filter-class>
</filter>
<!--
- Declare a filter for debugging incoming request
-->
<filter>
<description>Log debug information about each request</description>
<display-name>Cocoon debug filter</display-name>
<filter-name>CocoonDebugFilter</filter-name>
<filter-class>org.apache.cocoon.servlet.DebugFilter</filter-class>
</filter>
<!-- Filter mappings ================================================ -->
<!--
- Use the Cocoon multipart filter together with the Cocoon demo webapp
-->
<filter-mapping>
<filter-name>CocoonMultipartFilter</filter-name>
<servlet-name>Cocoon</servlet-name>
</filter-mapping>
<filter-mapping>
<filter-name>CocoonMultipartFilter</filter-name>
<servlet-name>DispatcherServlet</servlet-name>
</filter-mapping>
<!--
- Use the Cocoon debug filter together with the Cocoon demo webapp
<filter-mapping>
<filter-name>CocoonDebugFilter</filter-name>
<servlet-name>Cocoon</servlet-name>
</filter-mapping>
-->
<!-- Servlet Context Listener ======================================= -->
<!--
- Declare Spring context listener which sets up the Spring Application Context
- containing all Cocoon components (and user defined beans as well).
-->
<listener>
<listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
</listener>
<!--
- Declare Spring request listener which sets up the required RequestAttributes
- to support Springs and Cocoon custom bean scopes like the request scope or the
- session scope.
-->
<listener>
<listener-class>org.springframework.web.context.request.RequestContextListener</listener-class>
</listener>
<!-- Servlet Configuration ========================================== -->
<!--
- Servlet that dispatches requests to the Spring managed block servlets
-->
<servlet>
<description>Cocoon blocks dispatcher</description>
<display-name>DispatcherServlet</display-name>
<servlet-name>DispatcherServlet</servlet-name>
<servlet-class>org.apache.cocoon.servletservice.DispatcherServlet</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
<!-- URL space mappings ============================================= -->
<!--
- Cocoon handles all the URL space assigned to the webapp using its sitemap.
- It is recommended to leave it unchanged. Under some circumstances though
- (like integration with proprietary webapps or servlets) you might have
- to change this parameter.
-->
<servlet-mapping>
<servlet-name>DispatcherServlet</servlet-name>
<url-pattern>/*</url-pattern>
</servlet-mapping>
</web-app>
\ No newline at end of file
package com.zhiwei.brandkbs2;
import com.hankcs.hanlp.HanLP;
import com.alibaba.fastjson.JSONObject;
import com.hankcs.hanlp.dictionary.CustomDictionary;
import com.hankcs.hanlp.dictionary.stopword.CoreStopWordDictionary;
import com.hankcs.hanlp.seg.common.Term;
import com.zhiwei.brandkbs2.util.Tools;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* @ClassName: Test
......@@ -22,15 +21,19 @@ public class Test {
}
public static void main(String[] args) throws UnsupportedEncodingException {
List<Term> results= HanLP.segment("国家发改委:猪肉供应有保障 猪肉价格不具备大幅上涨基础");
for(Term term: CoreStopWordDictionary.apply(results)){
// if(CoreStopWordDictionary.shouldInclude(term)){
// System.out.println(term);
// }else{
System.err.println(term);
// }
}
System.out.println(JSONObject.toJSONString(Tools.parseToDays(1658512800000L,1659031200000L)));
// List<Term> results= HanLP.segment("国家发改委:猪肉供应有保障 猪肉价格不具备大幅上涨基础");
// for(Term term: CoreStopWordDictionary.apply(results)){
//// if(CoreStopWordDictionary.shouldInclude(term)){
//// System.out.println(term);
//// }else{
// System.err.println(term);
//// }
//
// }
// String token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwYXlsb2FkIjoie1widXNlcklkXCI6XCIyMFwiLFwia2V5XCI6XCIyMFwiLFwibmlja05hbWVcIjpcIuayiOWQm-adsFwiLFwic2VydmljZVwiOlwiXCJ9IiwiZXhwIjoxNjUzMDExNjcwLCJpYXQiOjE2NTI0MDY4MTB9.jcVXxeZkayc6-Aiq8cyYc1uyq4ugji6FdWQXCCp4M2o";
// String token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJwYXlsb2FkIjoiXCLlk4Hop4FcIiIsImV4cCI6NDc2MjgyMjEzMiwiaWF0IjoxNjUyNDIyMDcyfQ.DXQ8yKgfsCMjhT0xniZeWCMv4syqIoDvztU4QWsd-Fg";
......
......@@ -62,14 +62,15 @@ public class TestRunWith {
// UserThreadLocal.set(userInfo);
// ResponseResult result = appArticleController.getMarkSpread(1657468800000L, 1657555200000L);
// System.out.println(JSONObject.toJSONString(result));
// taskService.messageFlowCount(1);
taskService.messageFlowCount(2);
reportService.getReportsAggCount();
// reportService.getReportsAggCount();
}
@Test
public void test2() {
taskService.generateReportAndSend();
taskService.messageFlowCount(1);
// taskService.generateReportAndSend();
// ChannelIndex channelIndex = new ChannelIndex();
// channelIndex.setPlatform("微信");
// channelIndex.setRealSource("微信公众号");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment