Commit 98983736 by shentao

2018/8/6 消息流读取定向监测读取完毕

parent cfaf3818
package com.zhiwei.messageflow;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.Project;
import com.zhiwei.messageflow.mongo.dao.DirectGroupDao;
import com.zhiwei.messageflow.mongo.dao.PlatformDao;
import com.zhiwei.messageflow.mongo.dao.ProjectDao;
@Component
public class DirectES4RedisStart {
private final static Logger log = LoggerFactory.getLogger(DirectES4RedisStart.class);
@Autowired
private ProjectDao projectDao;
@Autowired
private DirectGroupDao directGroupDao;
@Autowired
private DirectES4RedisTask directES4RedisTask;
public void startThread() {
// 项目列表
List<Project> projects = projectDao.getAllProjects();
// 遍历项目
for (Project project : projects) {
/**
* 项目是否开启定向监测,并获取定向渠道组
*/
List<DirectGroup> dgList = directGroupDao.getDirectGroupsByProject(project.getProjectName());
String threadName = "定向监测-"+project.getProjectName();
// 获取线程
DirectES4RedisThread directES4RedisThread = DirectES4RedisThread.getThread(threadName, project,
dgList, directES4RedisTask);
// ES4RedisThread es4RedisThread =
// ES4RedisThread.getThread(project.getProjectName(), project,
// allplatformNames, es4RedisTask);
// 获取线程失败
if (directES4RedisThread == null) {
log.warn("{}项目获取线程失败", threadName);
continue;
}
// 线程启动
directES4RedisThread.start();
}
}
}
package com.zhiwei.messageflow;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zhiwei.messageflow.config.RedisConfig;
import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.bean.KeywordNew;
import com.zhiwei.messageflow.mongo.bean.Project;
import com.zhiwei.messageflow.mongo.dao.DirectGroupDao;
import com.zhiwei.messageflow.mongo.dao.KeywordNewDao;
import com.zhiwei.messageflow.redis.bean.RsidAndMessages;
import com.zhiwei.messageflow.redis.service.RedisService;
import com.zhiwei.messageflow.service.DisposeMessageService;
@Component
public class DirectES4RedisTask {
private final static Logger log = LoggerFactory.getLogger(DirectES4RedisTask.class);
@Autowired
private DirectGroupDao directGroupDao;
@Autowired
private KeywordNewDao keywordNewDao;
@Autowired
private RedisService redisService;
@Autowired
private DisposeMessageService disposeMessageService;
private ObjectMapper mapper = new ObjectMapper();
private static final int allkeywordcount = 5000;
/**
* 定向监测任务
*
* @Title: directTask
* @Description: TODO(定向监测任务)
* @param @param
* project
* @param @param
* dgList
* @param @param
* count
* @param @return
* 设定文件
* @return boolean 返回类型
*/
public boolean directTask(Project project, List<DirectGroup> dgList, int count)
throws JsonParseException, JsonMappingException, IOException {
try {
/**
* 获取项目对应的rsid Map
*/
String directRsidMapKey = RedisConfig.DIRECTKEY + project.getProjectName();
String rsidjson = redisService.getRsid(directRsidMapKey);
Map<String, Integer> rsidMap = new HashMap<>();
if (rsidjson != null) {
rsidMap = mapper.readValue(rsidjson, Map.class);
} else {
log.info("{}项目RSID列表过期", project.getProjectName());
}
// 统计项目消息总数
int num = 0;
// 更新rsid用的rsid Map
Map<String, Integer> newRsidMap = new HashMap<>();
newRsidMap.putAll(rsidMap);
// 项目关键词组列表
List<KeywordNew> keywordNews = keywordNewDao.getKeywordNewByProject(project.getProjectName());
for (DirectGroup directGroup : dgList) {
if (directGroup.getMemberList().isEmpty()) {
continue;
}
// 获取渠道组所包含渠道平台
List<String> ptlist = directGroupDao.getOneDirectGroupPts(directGroup);
// 遍历全平台获取对应渠道组id
for (String pt : ptlist) {
// 全关键词redis库中的key
String directRedisKey = redisService.getDirectRedisKey(project.getProjectName(),
directGroup.getName(), pt);
// 获取 平台-项目-关键词组 对应的rsid作为查询范围的起始点
Long startrsid = rsidMap.get(directRedisKey) == null ? -1L
: Long.valueOf(rsidMap.get(directRedisKey));
// 用于存储数据获取后新的rsid
Long keyrsid = -1L;
// 获取该渠道组对应关键词组的关键词,如没有则按全量
List<String> allkeywords = new ArrayList<>();
for (KeywordNew keywordNew : keywordNews) {
if (keywordNew.getDxIsUsed() && null != keywordNew.getQdList()
&& !keywordNew.getQdList().isEmpty()) {// 判断是否定向监测启用
if (keywordNew.getQdList().contains(directGroup.getId())) {// qdList中有渠道的id
allkeywords.addAll(keywordNew.getKeyWords());
}
}
}
// 根据不同平台获取数据(同一方法,统一封装为消息流实体
RsidAndMessages ram = disposeMessageService.getDirectMessage(allkeywords, count, startrsid, -1L, pt,
project.getProjectName(), directGroup);
// 获取查询到的信息
List<JSONObject> messageskey = ram.getJlist();
// 查询到数据量为0
if (messageskey == null) {
newRsidMap.put(directRedisKey, Integer.valueOf(startrsid.toString()));
// log.info("{}平台{}关键字词组无消息", platformName, kwn.getKeyTitle());
continue;
}
// log.info("{}平台{}关键词数据获取{}条", platformName, kwn.getKeyTitle(),
// messages.size());
num += messageskey.size();
// 记录新的rsid
keyrsid = ram.getRsid();
// 向redis写入数据
redisService.setMessage2Redis(directRedisKey, messageskey, allkeywordcount);
newRsidMap.put(directRedisKey, Integer.valueOf(startrsid.toString()));
}
// 向redis库中存储新的rsid Map,覆盖原有数据
String directRsidMap = redisService.getDirectRsidMapKey(project.getProjectName());
redisService.setRsid(newRsidMap, project.getProjectName());
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return false;
}
}
package com.zhiwei.messageflow;
import java.io.IOException;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.Project;
public class DirectES4RedisThread extends Thread {
private final static Logger log = LoggerFactory.getLogger(DirectES4RedisThread.class);
// 线程
private Thread t;
// 线程名
private String threadName;
// 项目
private Project project;
// 定向渠道组list
private List<DirectGroup> dgList;
// 单个平台单个关键词组每次查询数量
private static final int count = 300;
private DirectES4RedisTask directES4RedisTask;
public DirectES4RedisThread(String name, Project project, List<DirectGroup> dgList,
DirectES4RedisTask directES4RedisTask) {
threadName = name;
this.project = project;
this.dgList = dgList;
this.directES4RedisTask = directES4RedisTask;
}
public DirectES4RedisThread() {
}
public static DirectES4RedisThread getThread(String name, Project project, List<DirectGroup> dgList,
DirectES4RedisTask directES4RedisTask) {
DirectES4RedisThread directES4RedisThread = new DirectES4RedisThread(name, project, dgList,
directES4RedisTask);
return directES4RedisThread;
}
public void start() {
// 线程开始
log.info("Starting 定向监测:{}", threadName);
if (t == null) {
t = new Thread(this, threadName);
// 通知执行run方法
t.start();
}
// 超时控制器
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
// 超时则线程中止
if (t.isAlive()) {
t.interrupt();
log.warn("{}项目超时定向监测线程状态:{}", project.getProjectName(),t.isInterrupted());
}
}
}, 49 * 1000L);
}
@SuppressWarnings("static-access")
@Override
public void run() {
try {
Thread.sleep(10L);
// 超时控制器
// Timer timer = new Timer();
// timer.schedule(new TimerTask() {
// @Override
// public void run() {
// // 超时则线程中止
// Thread.currentThread().interrupted();
// log.warn("{}项目超时", project.getProjectName());
// }
// }, 49 * 1000L);
// 程序运行
log.info("Running 定向监测:{}", threadName);
// 该项目执行消息流获取
boolean flag = directES4RedisTask.directTask(project, dgList, count);
if (!flag) {
// 程序执行出现异常则线程中止
// timer.cancel();
Thread.currentThread().interrupted();
log.error("{}项目出现异常,定向监测线程状态:{}", project.getProjectName(),Thread.currentThread().isInterrupted());
}
// else
// // 程序正常执行完毕,关闭超时控制器
// timer.cancel();
} catch (JsonParseException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 线程退出
log.info("Thread {} exiting.", threadName);
}
}
}
...@@ -41,6 +41,7 @@ public class ES4RedisRunner implements ApplicationRunner { ...@@ -41,6 +41,7 @@ public class ES4RedisRunner implements ApplicationRunner {
*/ */
// 手动注入bean ES4RedisStart // 手动注入bean ES4RedisStart
ES4RedisStart start = ApplicationContextProvider.getBean("ES4RedisStart", ES4RedisStart.class); ES4RedisStart start = ApplicationContextProvider.getBean("ES4RedisStart", ES4RedisStart.class);
DirectES4RedisStart directstart = ApplicationContextProvider.getBean("DirectES4RedisStart", DirectES4RedisStart.class);
// 定时器 // 定时器
Timer timer = new Timer(); Timer timer = new Timer();
...@@ -50,6 +51,7 @@ public class ES4RedisRunner implements ApplicationRunner { ...@@ -50,6 +51,7 @@ public class ES4RedisRunner implements ApplicationRunner {
public void run() { public void run() {
try { try {
start.startThread(); start.startThread();
directstart.startThread();
} catch (Exception e) { } catch (Exception e) {
log.error("主定时器异常{}{}",e.getMessage(),e.getStackTrace()); log.error("主定时器异常{}{}",e.getMessage(),e.getStackTrace());
} }
......
...@@ -233,6 +233,7 @@ public class ES4RedisTask { ...@@ -233,6 +233,7 @@ public class ES4RedisTask {
// 向redis写入数据 // 向redis写入数据
redisService.setMessage2Redis(redisKey, messageskey, keywordscount); redisService.setMessage2Redis(redisKey, messageskey, keywordscount);
newRsidMap.put(redisKey, Integer.valueOf(startrsid.toString()));
} // 遍历关键词组 } // 遍历关键词组
// 预警 // 预警
...@@ -244,9 +245,9 @@ public class ES4RedisTask { ...@@ -244,9 +245,9 @@ public class ES4RedisTask {
} // 遍历平台 } // 遍历平台
log.info("{}项目本次获取消息数:{}", project.getProjectName(), num); log.info("{}项目本次定向监测获取消息数:{}", project.getProjectName(), num);
} catch (Exception e) { } catch (Exception e) {
log.error("项目本次获取获取出错或超时{}{}", e.getMessage(), e.getStackTrace()); log.error("项目本次定向监测获取获取出错或超时{}{}", e.getMessage(), e.getStackTrace());
return false; return false;
} }
return true; return true;
......
...@@ -24,6 +24,7 @@ public class RedisConfig { ...@@ -24,6 +24,7 @@ public class RedisConfig {
private String ip; private String ip;
private int port; private int port;
private String password; private String password;
public static String DIRECTKEY;
private int keyMaxSize; private int keyMaxSize;
......
...@@ -4,6 +4,7 @@ import java.util.List; ...@@ -4,6 +4,7 @@ import java.util.List;
import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.SearchHits;
import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.bean.PlatformNew; import com.zhiwei.messageflow.mongo.bean.PlatformNew;
//第一层 从ES获取原始数据 //第一层 从ES获取原始数据
...@@ -103,5 +104,21 @@ public interface ESDao { ...@@ -103,5 +104,21 @@ public interface ESDao {
public int get24hoursFirstCommonid(String tableName); public int get24hoursFirstCommonid(String tableName);
public int getCommonidByTime(String tableName, long endtime); public int getCommonidByTime(String tableName, long endtime);
/**
* 获取定向监测ESmessage
* @Title: getDirectDataFromEs
* @Description: TODO(获取定向监测ESmessage)
* @param @param keywords
* @param @param count
* @param @param startid
* @param @param endid
* @param @param pt
* @param @param projectName
* @param @param directGroup
* @param @return 设定文件
* @return SearchHits 返回类型
*/
public SearchHits getDirectDataFromEs(List<String> keywords, int count, long startid, long endid, String pt,
String projectName, DirectGroup directGroup);
} }
package com.zhiwei.messageflow.es.dao.impl; package com.zhiwei.messageflow.es.dao.impl;
import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
...@@ -17,6 +18,7 @@ import com.zhiwei.es.util.ESIndexesUtil; ...@@ -17,6 +18,7 @@ import com.zhiwei.es.util.ESIndexesUtil;
import com.zhiwei.messageflow.ESGetCommonId; import com.zhiwei.messageflow.ESGetCommonId;
import com.zhiwei.messageflow.es.ESClient; import com.zhiwei.messageflow.es.ESClient;
import com.zhiwei.messageflow.es.dao.ESDao; import com.zhiwei.messageflow.es.dao.ESDao;
import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.bean.PlatformNew; import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.util.ESQueryUtil; import com.zhiwei.messageflow.util.ESQueryUtil;
import com.zhiwei.messageflow.util.TimeUtil; import com.zhiwei.messageflow.util.TimeUtil;
...@@ -32,7 +34,7 @@ public class ESDaoImpl implements ESDao { ...@@ -32,7 +34,7 @@ public class ESDaoImpl implements ESDao {
@Override @Override
public SearchHits getDataFromEs(List<String> keywords, int count, long startid, long endid, PlatformNew platform, public SearchHits getDataFromEs(List<String> keywords, int count, long startid, long endid, PlatformNew platform,
String project ,String matchFields) { String project, String matchFields) {
// 按ptlist获取indexlist // 按ptlist获取indexlist
List<String> ptList = platform.getPt(); List<String> ptList = platform.getPt();
...@@ -46,7 +48,7 @@ public class ESDaoImpl implements ESDao { ...@@ -46,7 +48,7 @@ public class ESDaoImpl implements ESDao {
// 一阶段组装关键词 // 一阶段组装关键词
// 取field // 取field
String[] fieldlist = ESQueryUtil.getFieldListbyPtList(ptList, noPlatformType,matchFields); String[] fieldlist = ESQueryUtil.getFieldListbyPtList(ptList, noPlatformType, matchFields);
BoolQueryBuilder keywordQueryBuilder = ESQueryUtil.assembleKeywordQuery(QueryBuilders.boolQuery(), keywords, BoolQueryBuilder keywordQueryBuilder = ESQueryUtil.assembleKeywordQuery(QueryBuilders.boolQuery(), keywords,
fieldlist); fieldlist);
queryBuilder.must(keywordQueryBuilder); queryBuilder.must(keywordQueryBuilder);
...@@ -77,7 +79,7 @@ public class ESDaoImpl implements ESDao { ...@@ -77,7 +79,7 @@ public class ESDaoImpl implements ESDao {
queryBuilder.must(pluginQueryBuilder); queryBuilder.must(pluginQueryBuilder);
// 组装高亮字段 // 组装高亮字段
String[] allfieldlist = ESQueryUtil.getFieldListbyPtList(ptList, noPlatformType,""); String[] allfieldlist = ESQueryUtil.getFieldListbyPtList(ptList, noPlatformType, "");
HighlightBuilder highlightBuilder = ESQueryUtil.getHighlightBuilder(allfieldlist); HighlightBuilder highlightBuilder = ESQueryUtil.getHighlightBuilder(allfieldlist);
// 组配查询 // 组配查询
...@@ -847,4 +849,89 @@ public class ESDaoImpl implements ESDao { ...@@ -847,4 +849,89 @@ public class ESDaoImpl implements ESDao {
} }
return commonid; return commonid;
} }
@Override
public SearchHits getDirectDataFromEs(List<String> keywords, int count, long startid, long endid, String pt,
String projectName, DirectGroup directGroup) {
// 按pt获取indexList
String[] indexlist = ESQueryUtil.getDirectIndexListbyPt(pt);
// 按pt获取typelist
String[] typelist = ESQueryUtil.getDirectTypeListbyPt(pt);
// 组装查询语句
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
if (!keywords.isEmpty()) {// 定向监测配有关键词范围
// 一阶段组装关键词
// 取field
String[] fieldlist = ESQueryUtil.getDirectFieldListbyPt(pt, "");
BoolQueryBuilder keywordQueryBuilder = ESQueryUtil.assembleKeywordQuery(QueryBuilders.boolQuery(), keywords,
fieldlist);
queryBuilder.must(keywordQueryBuilder);
}
// 二阶段组装渠道
// 组装type
List<String> typeList = ESQueryUtil.getDirectInsideTypeListbyPt(pt);
BoolQueryBuilder tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(QueryBuilders.boolQuery(), typeList,
null, "type");
// 组装source和特征属性
List<String> sourceList = new ArrayList<>();
List<String> specTypeList = new ArrayList<>();
// 获取source和特征属性
List<String> directMemberIds = directGroup.getMemberList();
for (String directMemberId : directMemberIds) {
String memberPt = directMemberId.split("_")[0];
String memberSource = directMemberId.split("_")[1];
String memberSpec = directMemberId.split("_")[2];
if (pt.equals(memberPt)) {
sourceList.add(memberSource);
specTypeList.add(memberSpec);
}
}
if (pt.equals("微博")) {
tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder, sourceList, null, "username");
tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder, specTypeList, null, "user_id");
} else {
tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder, sourceList, null, "source");
if (pt.equals("微信")) {
// tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder,
// specTypeList, null, "source");
} else if (pt.equals("今日头条")) {
tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder, specTypeList, null,
"user_id");
} else {
tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder, specTypeList, null, "url");
}
}
queryBuilder.must(tsuQueryBuilder);
// // 四阶段插件过滤
// BoolQueryBuilder pluginQueryBuilder = ESQueryUtil.assemblePluginQurey(QueryBuilders.boolQuery());
// queryBuilder.must(pluginQueryBuilder);
// 组装高亮字段
String[] allfieldlist = ESQueryUtil.getDirectFieldListbyPt(pt, "");
HighlightBuilder highlightBuilder = ESQueryUtil.getHighlightBuilder(allfieldlist);
// 组配查询
SearchRequestBuilder searchRequestBuilder = transportClient.prepareSearch(indexlist).setTypes(typelist)
.highlighter(highlightBuilder).setQuery(queryBuilder).addSort("commonid", SortOrder.DESC).setFrom(0)
.setSize(count);
if (startid > 0 && endid <= 0) {
searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("commonid").from(startid));
} else if (endid > 0 && startid <= 0) {
searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("commonid").to(endid));
} else if (endid > 0 && startid > 0) {
searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("commonid").from(startid).to(endid));
} else {
searchRequestBuilder.setPostFilter(QueryBuilders.rangeQuery("commonid").from(ESGetCommonId.START_COMMONID));
}
SearchResponse response = searchRequestBuilder.execute().actionGet();
SearchHits searchHits = response.getHits();
// System.err.println("totalHit"+searchHits.totalHits+"hitsnum"+searchHits.getHits().length);
return searchHits;
}
} }
\ No newline at end of file
...@@ -7,6 +7,7 @@ import com.zhiwei.messageflow.bean.MediaMessage; ...@@ -7,6 +7,7 @@ import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage; import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage; import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage; import com.zhiwei.messageflow.bean.ZhihuMessage;
import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.bean.NoiseRule; import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.bean.PlatformNew; import com.zhiwei.messageflow.mongo.bean.PlatformNew;
...@@ -30,8 +31,8 @@ public interface ES4BeanService { ...@@ -30,8 +31,8 @@ public interface ES4BeanService {
* 项目 * 项目
* @return * @return
*/ */
List<WeiboMessage> getWeiboMessage(List<NoiseRule> noiseRules,List<String> keywords, int count, long startid, long endid, String platform, List<WeiboMessage> getWeiboMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid,
String project); long endid, String platform, String project);
/** /**
* 获取知乎消息 * 获取知乎消息
...@@ -50,8 +51,8 @@ public interface ES4BeanService { ...@@ -50,8 +51,8 @@ public interface ES4BeanService {
* 项目 * 项目
* @return * @return
*/ */
List<ZhihuMessage> getZhihuMessage(List<NoiseRule> noiseRules,List<String> keywords, int count, long startid, long endid, String platform, List<ZhihuMessage> getZhihuMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid,
String project); long endid, String platform, String project);
/** /**
* 获取视频消息 * 获取视频消息
...@@ -70,8 +71,8 @@ public interface ES4BeanService { ...@@ -70,8 +71,8 @@ public interface ES4BeanService {
* 项目 * 项目
* @return * @return
*/ */
List<VideoMessage> getVideoMessage(List<NoiseRule> noiseRules,List<String> keywords, int count, long startid, long endid, String platform, List<VideoMessage> getVideoMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid,
String project); long endid, String platform, String project);
/** /**
* 获取网媒消息 * 获取网媒消息
...@@ -90,8 +91,9 @@ public interface ES4BeanService { ...@@ -90,8 +91,9 @@ public interface ES4BeanService {
* 项目 * 项目
* @return * @return
*/ */
List<MediaMessage> getMediaMessage(List<NoiseRule> noiseRules,List<String> keywords, int count, long startid, long endid, String platform, List<MediaMessage> getMediaMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid,
String project); long endid, String platform, String project);
/** /**
* 获取各平台消息 * 获取各平台消息
* *
...@@ -112,4 +114,30 @@ public interface ES4BeanService { ...@@ -112,4 +114,30 @@ public interface ES4BeanService {
*/ */
List<JSONObject> getMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid, long endid, List<JSONObject> getMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, long startid, long endid,
PlatformNew platform, String project, String matchFields); PlatformNew platform, String project, String matchFields);
/**
* 获取各平台定向监测消息
*
* @Title: getDirectMessage
* @Description: TODO(获取各平台定向监测消息)
* @param @param
* keywords
* @param @param
* count
* @param @param
* l
* @param @param
* m
* @param @param
* pt
* @param @param
* projectName
* @param @param
* directGroup
* @param @return
* 设定文件
* @return List<JSONObject> 返回类型
*/
List<JSONObject> getDirectMessage(List<String> keywords, int count, long startid, long endid, String pt, String projectName,
DirectGroup directGroup);
} }
...@@ -21,7 +21,28 @@ public interface NoiseProcessingService { ...@@ -21,7 +21,28 @@ public interface NoiseProcessingService {
List<VideoMessage> videoDenoising(List<NoiseRule> noiseRules,SearchHits searchHits, String platform, String project); List<VideoMessage> videoDenoising(List<NoiseRule> noiseRules,SearchHits searchHits, String platform, String project);
List<MediaMessage> mediaDenoising(List<NoiseRule> noiseRules,SearchHits searchHits, String platform, String project); List<MediaMessage> mediaDenoising(List<NoiseRule> noiseRules,SearchHits searchHits, String platform, String project);
/**
* 读取消息流去噪
* @Title: allDenoising
* @Description: TODO(读取消息流去噪)
* @param @param noiseRules
* @param @param searchHits
* @param @param platform
* @param @param project
* @param @return 设定文件
* @return List<JSONObject> 返回类型
*/
List<JSONObject> allDenoising(List<NoiseRule> noiseRules, SearchHits searchHits, PlatformNew platform, List<JSONObject> allDenoising(List<NoiseRule> noiseRules, SearchHits searchHits, PlatformNew platform,
String project); String project);
/**
* 读取定向监测消息流去噪
* @Title: directDenoising
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param searchHits
* @param @param pt
* @param @param projectName
* @param @return 设定文件
* @return List<JSONObject> 返回类型
*/
List<JSONObject> directDenoising(SearchHits searchHits, String pt, String projectName);
} }
...@@ -16,6 +16,7 @@ import com.zhiwei.messageflow.bean.ZhihuMessage; ...@@ -16,6 +16,7 @@ import com.zhiwei.messageflow.bean.ZhihuMessage;
import com.zhiwei.messageflow.es.dao.ESDao; import com.zhiwei.messageflow.es.dao.ESDao;
import com.zhiwei.messageflow.es.service.ES4BeanService; import com.zhiwei.messageflow.es.service.ES4BeanService;
import com.zhiwei.messageflow.es.service.NoiseProcessingService; import com.zhiwei.messageflow.es.service.NoiseProcessingService;
import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.bean.NoiseRule; import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.bean.PlatformNew; import com.zhiwei.messageflow.mongo.bean.PlatformNew;
...@@ -136,4 +137,28 @@ public class ES4BeanServiceImpl implements ES4BeanService { ...@@ -136,4 +137,28 @@ public class ES4BeanServiceImpl implements ES4BeanService {
return messages; return messages;
} }
@Override
public List<JSONObject> getDirectMessage(List<String> keywords, int count, long startid, long endid, String pt,
String projectName, DirectGroup directGroup) {
List<JSONObject> messages = null;
try {
// 查询数据库 获得searchHits
SearchHits searchHits = esDao.getDirectDataFromEs(keywords, count, startid, endid, pt, projectName,directGroup);
if (searchHits == null) {
return null;
}
// 去噪并封装
messages = noiseProcessingService.directDenoising( searchHits, pt, projectName);
} catch (Exception e) {
log.error("error:",e);
}
return messages;
}
} }
...@@ -1110,4 +1110,33 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService { ...@@ -1110,4 +1110,33 @@ public class NoiseProcessingServiceImpl implements NoiseProcessingService {
return isnoise; return isnoise;
} }
@Override
public List<JSONObject> directDenoising(SearchHits searchHits, String pt, String projectName) {
List<JSONObject> jmessages = new ArrayList<>();
SearchHit[] sh = searchHits.getHits();
for (SearchHit searchHit : sh) {
// System.err.println(searchHit.getSourceAsString());
Map<String, Object> map = searchHit.getSource();
if (map == null) {
continue;
}
//source为空判断
String type = searchHit.getType();
if (!type.equals("status")) {
// 除微博
if(map.get("source") == null){
continue;
}
}
// 数据转换并高亮处理
JSONObject jo = highLightFillingService.getBean(searchHit);
if (jo != null) {
jmessages.add(jo);
}
}
return jmessages;
}
} }
package com.zhiwei.messageflow.mongo.bean;
import java.util.HashMap;
import java.util.List;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@Document(collection = "qbjc_directGroup")
public class DirectGroup {
/**
* "id"
*/
@Id
private String id;
/**
* "name": 渠道组名
*/
private String name;
/**
* project 所属项目
*/
private String project;
/**
* submitter提交人员
*/
private String submitter;
/**
* 创建时间createAt
*/
private long createAt;
/**
* 是否使用isUsed
*/
private Boolean isUsed;
/**
* 是否标注isTag
*/
private Boolean isTag;
/**
* memberList member使用中的list
*/
private List<String> memberList;
/**
* memberMaps member全信息list
*/
private List<HashMap<String,Object>> memberMaps;
}
...@@ -21,5 +21,7 @@ public class KeywordNew { ...@@ -21,5 +21,7 @@ public class KeywordNew {
private String submitter; private String submitter;
private long createAt; private long createAt;
private Boolean isUsed; private Boolean isUsed;
private Boolean dxIsUsed;
private List<String> ptList;//平台列表 private List<String> ptList;//平台列表
private List<String> qdList;
} }
package com.zhiwei.messageflow.mongo.dao;
import java.util.List;
import com.zhiwei.messageflow.mongo.bean.DirectGroup;
/**
* 定向监测组Dao
* @ClassName: DirectGroupDao
* @Description: TODO(定向监测组Dao)
* @author shentao
* @date 2018年8月3日 上午10:52:22
*/
public interface DirectGroupDao {
/**
* 按项目名获取全部渠道组
* @Title: getDirectGroupsByProject
* @Description: TODO(按项目名获取全部渠道组)
* @param @param projectName
* @param @return 设定文件
* @return List<DirectGroup> 返回类型
*/
List<DirectGroup> getDirectGroupsByProject(String projectName);
/**
* 按渠道组获取该渠道组含有pts
* @Title: getOneDirectGroupPts
* @Description: TODO(按渠道组获取该渠道组含有pts)
* @param @param directGroup
* @param @return 设定文件
* @return List<String> 返回类型
* @throws Exception
*/
List<String> getOneDirectGroupPts(DirectGroup directGroup);
}
package com.zhiwei.messageflow.mongo.dao.impl;
import java.util.ArrayList;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.stereotype.Component;
import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.dao.DirectGroupDao;
@Component
public class DirectGroupDaoImpl implements DirectGroupDao {
@Autowired
@Qualifier(value = "primaryMongoTemplate")
protected MongoTemplate primaryMongoTemplate;
@Override
public List<DirectGroup> getDirectGroupsByProject(String projectName) {
Query query = new Query().addCriteria(Criteria.where("project").is(projectName));
List<DirectGroup> res = primaryMongoTemplate.find(query, DirectGroup.class);
return res;
}
@Override
public List<String> getOneDirectGroupPts(DirectGroup directGroup) {
List<String> res = new ArrayList<>();
try {
List<String> idlist = directGroup.getMemberList();
for (String id : idlist) {
String pt = id.split("_")[0];
if(res.isEmpty()) {
res.add(pt);
}else if(!res.contains(pt)){
res.add(pt);
}
}
} catch (Exception e) {
return null;
}
return res;
}
}
...@@ -92,4 +92,24 @@ public interface RedisService { ...@@ -92,4 +92,24 @@ public interface RedisService {
* @return void 返回类型 * @return void 返回类型
*/ */
void setMessage2Redis(String allRedisKey, List<JSONObject> messages, int allkeywordcount); void setMessage2Redis(String allRedisKey, List<JSONObject> messages, int allkeywordcount);
/**
* 获取定向监测缓存Key
* @Title: getDirectRedisKey
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param projectName
* @param @param name
* @param @param pt
* @param @return 设定文件
* @return String 返回类型
*/
String getDirectRedisKey(String projectName, String groupName, String pt);
/**
* 获取定向监测缓存rsidmapKey
* @Title: getDirectRsidMapKey
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param projectName
* @param @return 设定文件
* @return String 返回类型
*/
String getDirectRsidMapKey(String projectName);
} }
...@@ -14,6 +14,7 @@ import com.zhiwei.messageflow.bean.MediaMessage; ...@@ -14,6 +14,7 @@ import com.zhiwei.messageflow.bean.MediaMessage;
import com.zhiwei.messageflow.bean.VideoMessage; import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage; import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage; import com.zhiwei.messageflow.bean.ZhihuMessage;
import com.zhiwei.messageflow.config.RedisConfig;
import com.zhiwei.messageflow.redis.RedisPoolAndTools; import com.zhiwei.messageflow.redis.RedisPoolAndTools;
import com.zhiwei.messageflow.redis.service.RedisService; import com.zhiwei.messageflow.redis.service.RedisService;
...@@ -155,4 +156,14 @@ public class RedisServiceImpl implements RedisService { ...@@ -155,4 +156,14 @@ public class RedisServiceImpl implements RedisService {
} }
@Override
public String getDirectRedisKey(String projectName, String groupName, String pt) {
return RedisConfig.DIRECTKEY+projectName+"-"+groupName+"-"+pt;
}
@Override
public String getDirectRsidMapKey(String projectName) {
return RedisConfig.DIRECTKEY+projectName;
}
} }
...@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.service; ...@@ -2,6 +2,7 @@ package com.zhiwei.messageflow.service;
import java.util.List; import java.util.List;
import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.bean.NoiseRule; import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.bean.PlatformNew; import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.redis.bean.RsidAndMessages; import com.zhiwei.messageflow.redis.bean.RsidAndMessages;
...@@ -118,4 +119,20 @@ public interface DisposeMessageService { ...@@ -118,4 +119,20 @@ public interface DisposeMessageService {
*/ */
RsidAndMessages getFilteredMessage(List<NoiseRule> noiseRules, List<String> keywords, int count, RsidAndMessages getFilteredMessage(List<NoiseRule> noiseRules, List<String> keywords, int count,
Long startid, Long endid, PlatformNew platform, String projectName, String matchFields); Long startid, Long endid, PlatformNew platform, String projectName, String matchFields);
/**
* 获取定向监测渠道RsidAndMessages
* @Title: getDirectMessage
* @Description: TODO(获取定向监测渠道RsidAndMessages)
* @param @param allkeywords
* @param @param count
* @param @param allstartrsid
* @param @param l
* @param @param pt
* @param @param projectName
* @param @param directGroup
* @param @return 设定文件
* @return RsidAndMessages 返回类型
*/
RsidAndMessages getDirectMessage(List<String> keywords, int count, long startid, long endid, String pt,
String projectName, DirectGroup directGroup);
} }
...@@ -15,6 +15,7 @@ import com.zhiwei.messageflow.bean.VideoMessage; ...@@ -15,6 +15,7 @@ import com.zhiwei.messageflow.bean.VideoMessage;
import com.zhiwei.messageflow.bean.WeiboMessage; import com.zhiwei.messageflow.bean.WeiboMessage;
import com.zhiwei.messageflow.bean.ZhihuMessage; import com.zhiwei.messageflow.bean.ZhihuMessage;
import com.zhiwei.messageflow.es.service.ES4BeanService; import com.zhiwei.messageflow.es.service.ES4BeanService;
import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.bean.NoiseRule; import com.zhiwei.messageflow.mongo.bean.NoiseRule;
import com.zhiwei.messageflow.mongo.bean.PlatformNew; import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.redis.bean.RsidAndMessages; import com.zhiwei.messageflow.redis.bean.RsidAndMessages;
...@@ -143,5 +144,30 @@ public class DisposeMessageServiceImpl implements DisposeMessageService { ...@@ -143,5 +144,30 @@ public class DisposeMessageServiceImpl implements DisposeMessageService {
return ram; return ram;
} }
@Override
public RsidAndMessages getDirectMessage(List<String> keywords, int count, long startid, long endid, String pt,
String projectName, DirectGroup directGroup) {
RsidAndMessages ram = new RsidAndMessages();
// 消息列表
List<JSONObject> messages = new ArrayList<JSONObject>();
messages = es4BeanDao.getDirectMessage(keywords, count, startid + 1L, -1L, pt, projectName,directGroup);
/**
* 获取最后消息的rsid
*/
if (messages != null && !messages.isEmpty()) {
JSONObject mmf = messages.get(0);
JSONObject mml = messages.get(messages.size() - 1);
ram.setJlist(messages);
long fcomid = mmf.getLongValue("commonid");
long lcomid = mml.getLongValue("commonid");
ram.setRsid(fcomid > lcomid ? fcomid : lcomid);
}
return ram;
}
} }
...@@ -785,5 +785,122 @@ public class ESQueryUtil { ...@@ -785,5 +785,122 @@ public class ESQueryUtil {
} }
return boolQuery; return boolQuery;
} }
/**
* 获取定向ESindex
* @Title: getDirectIndexListbyPt
* @Description: TODO(获取定向ESindex)
* @param @param pt
* @param @return 设定文件
* @return String[] 返回类型
*/
public static String[] getDirectIndexListbyPt(String pt) {
String[] indexlist = null;
List<String> list = new ArrayList<>();
String index = "";
switch (pt) {
case "微博":
index = "network";
break;
default:
index = "mediaspider";
break;
}
if (!index.contains("*")) {
list.addAll(indexes.getLastIndexes(index, 2));
} else {
list.add(index);
}
indexlist = list.toArray(new String[list.size()]);
return indexlist;
}
/**
* 获取定向EStypes
* @Title: getDirectTypeListbyPt
* @Description: TODO(这里用一句话描述这个方法的作用)
* @param @param pt
* @param @return 设定文件
* @return String[] 返回类型
*/
public static String[] getDirectTypeListbyPt(String pt) {
String[] typelist = null;
List<String> list = new ArrayList<>();
switch (pt) {
case "微博":
list.add("status");
break;
default:
list.add("media");//网媒
list.add("wechat");//微信
list.add("print_media");//平媒
break;
}
typelist = list.toArray(new String[list.size()]);
return typelist;
}
/**
* 获取定向关键词fieldslist
* @Title: getDirectFieldListbyPt
* @Description: TODO(获取定向关键词fieldslist)
* @param @param pt
* @param @param matchFields
* @param @return 设定文件
* @return String[] 返回类型
*/
public static String[] getDirectFieldListbyPt(String pt, String matchFields) {
String[] fieldlist = null;
List<String> list = new ArrayList<>();
switch (pt) {
case "微博":
list.add("text");
list.add("roottext");
break;
default:
if (matchFields.equals("标题") || matchFields.equals("标题+全文") || matchFields.equals("")) {
if (!list.contains("title")) {
list.add("title");
}
}
if (matchFields.equals("全文") || matchFields.equals("标题+全文") || matchFields.equals("")) {
// if ((matchFields.equals("全文") || matchFields.equals("标题+全文") || matchFields.equals(""))&&!ptList.contains("video")) {
if (!list.contains("content")) {
list.add("content");
}
}
break;
}
fieldlist = list.toArray(new String[list.size()]);
return fieldlist;
}
/**
* 获取定向查询语句中type的list
* @Title: getDirectInsideTypeListbyPt
* @Description: TODO(获取定向查询语句中type的list)
* @param @param pt
* @param @return 设定文件
* @return List<String> 返回类型
*/
public static List<String> getDirectInsideTypeListbyPt(String pt) {
List<String> list = new ArrayList<>();
switch (pt) {
case "微博":
list.add("新浪微博");
break;
case "今日头条":
list.add("今日头条");
list.add("*头条");
break;
case "微信":
list.add("微信");
list.add("微信*");
break;
default:
list.add("网媒");
list.add("*新闻");
list.add("*自媒体");
list.add("一点资讯");
break;
}
return list;
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment