Commit a6a94c6b by shentao

2018/8/13 定向监测消息流终版

parent 98983736
...@@ -15,8 +15,9 @@ import com.zhiwei.messageflow.mongo.dao.PlatformDao; ...@@ -15,8 +15,9 @@ import com.zhiwei.messageflow.mongo.dao.PlatformDao;
import com.zhiwei.messageflow.mongo.dao.ProjectDao; import com.zhiwei.messageflow.mongo.dao.ProjectDao;
@Component @Component
public class DirectES4RedisStart { public class DES4RedisStart{
private final static Logger log = LoggerFactory.getLogger(DirectES4RedisStart.class);
private final static Logger log = LoggerFactory.getLogger(DES4RedisStart.class);
@Autowired @Autowired
private ProjectDao projectDao; private ProjectDao projectDao;
...@@ -37,6 +38,9 @@ public class DirectES4RedisStart { ...@@ -37,6 +38,9 @@ public class DirectES4RedisStart {
/** /**
* 项目是否开启定向监测,并获取定向渠道组 * 项目是否开启定向监测,并获取定向渠道组
*/ */
if(!project.getDiyMonitor()) {
continue;
}
List<DirectGroup> dgList = directGroupDao.getDirectGroupsByProject(project.getProjectName()); List<DirectGroup> dgList = directGroupDao.getDirectGroupsByProject(project.getProjectName());
String threadName = "定向监测-"+project.getProjectName(); String threadName = "定向监测-"+project.getProjectName();
......
...@@ -86,9 +86,11 @@ public class DirectES4RedisTask { ...@@ -86,9 +86,11 @@ public class DirectES4RedisTask {
List<KeywordNew> keywordNews = keywordNewDao.getKeywordNewByProject(project.getProjectName()); List<KeywordNew> keywordNews = keywordNewDao.getKeywordNewByProject(project.getProjectName());
for (DirectGroup directGroup : dgList) { for (DirectGroup directGroup : dgList) {
if (directGroup.getMemberList().isEmpty()) { //判断渠道组是否需要读取
if(!directGroup.getIsUsed()||(null==directGroup.getMemberList()))
continue;
if (directGroup.getMemberList().isEmpty())
continue; continue;
}
// 获取渠道组所包含渠道平台 // 获取渠道组所包含渠道平台
List<String> ptlist = directGroupDao.getOneDirectGroupPts(directGroup); List<String> ptlist = directGroupDao.getOneDirectGroupPts(directGroup);
// 遍历全平台获取对应渠道组id // 遍历全平台获取对应渠道组id
...@@ -107,9 +109,9 @@ public class DirectES4RedisTask { ...@@ -107,9 +109,9 @@ public class DirectES4RedisTask {
// 获取该渠道组对应关键词组的关键词,如没有则按全量 // 获取该渠道组对应关键词组的关键词,如没有则按全量
List<String> allkeywords = new ArrayList<>(); List<String> allkeywords = new ArrayList<>();
for (KeywordNew keywordNew : keywordNews) { for (KeywordNew keywordNew : keywordNews) {
if (keywordNew.getDxIsUsed() && null != keywordNew.getQdList() if (null != keywordNew.getDxIsUsed() && keywordNew.getDxIsUsed()
&& !keywordNew.getQdList().isEmpty()) {// 判断是否定向监测启用 && null != keywordNew.getQdList() && !keywordNew.getQdList().isEmpty()) {// 判断是否定向监测启用
if (keywordNew.getQdList().contains(directGroup.getId())) {// qdList中有渠道的id if (keywordNew.getQdList().contains(directGroup.getName())) {// qdList中有渠道的name
allkeywords.addAll(keywordNew.getKeyWords()); allkeywords.addAll(keywordNew.getKeyWords());
} }
} }
...@@ -136,18 +138,18 @@ public class DirectES4RedisTask { ...@@ -136,18 +138,18 @@ public class DirectES4RedisTask {
// 向redis写入数据 // 向redis写入数据
redisService.setMessage2Redis(directRedisKey, messageskey, allkeywordcount); redisService.setMessage2Redis(directRedisKey, messageskey, allkeywordcount);
newRsidMap.put(directRedisKey, Integer.valueOf(startrsid.toString())); newRsidMap.put(directRedisKey, Integer.valueOf(keyrsid.toString()));
} }
// 向redis库中存储新的rsid Map,覆盖原有数据 // 向redis库中存储新的rsid Map,覆盖原有数据
String directRsidMap = redisService.getDirectRsidMapKey(project.getProjectName()); redisService.setRsid(newRsidMap, directRsidMapKey);
redisService.setRsid(newRsidMap, project.getProjectName());
} }
log.info("{}项目定向监测本次获取消息数:{}", project.getProjectName(), num);
} catch (Exception e) { } catch (Exception e) {
// TODO Auto-generated catch block log.error("项目定向监测本次获取获取出错或超时{}{}", e.getMessage(), e.getStackTrace());
e.printStackTrace(); return false;
} }
return false; return true;
} }
} }
...@@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory; ...@@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonMappingException;
import com.zhiwei.messageflow.mongo.bean.DirectGroup; import com.zhiwei.messageflow.mongo.bean.DirectGroup;
import com.zhiwei.messageflow.mongo.bean.PlatformNew;
import com.zhiwei.messageflow.mongo.bean.Project; import com.zhiwei.messageflow.mongo.bean.Project;
public class DirectES4RedisThread extends Thread { public class DirectES4RedisThread extends Thread {
......
...@@ -41,7 +41,7 @@ public class ES4RedisRunner implements ApplicationRunner { ...@@ -41,7 +41,7 @@ public class ES4RedisRunner implements ApplicationRunner {
*/ */
// 手动注入bean ES4RedisStart // 手动注入bean ES4RedisStart
ES4RedisStart start = ApplicationContextProvider.getBean("ES4RedisStart", ES4RedisStart.class); ES4RedisStart start = ApplicationContextProvider.getBean("ES4RedisStart", ES4RedisStart.class);
DirectES4RedisStart directstart = ApplicationContextProvider.getBean("DirectES4RedisStart", DirectES4RedisStart.class); DES4RedisStart directstart = ApplicationContextProvider.getBean("DES4RedisStart", DES4RedisStart.class);
// 定时器 // 定时器
Timer timer = new Timer(); Timer timer = new Timer();
......
...@@ -233,7 +233,7 @@ public class ES4RedisTask { ...@@ -233,7 +233,7 @@ public class ES4RedisTask {
// 向redis写入数据 // 向redis写入数据
redisService.setMessage2Redis(redisKey, messageskey, keywordscount); redisService.setMessage2Redis(redisKey, messageskey, keywordscount);
newRsidMap.put(redisKey, Integer.valueOf(startrsid.toString())); newRsidMap.put(redisKey, Integer.valueOf(keyrsid.toString()));
} // 遍历关键词组 } // 遍历关键词组
// 预警 // 预警
...@@ -245,9 +245,9 @@ public class ES4RedisTask { ...@@ -245,9 +245,9 @@ public class ES4RedisTask {
} // 遍历平台 } // 遍历平台
log.info("{}项目本次定向监测获取消息数:{}", project.getProjectName(), num); log.info("{}项目本次获取消息数:{}", project.getProjectName(), num);
} catch (Exception e) { } catch (Exception e) {
log.error("项目本次定向监测获取获取出错或超时{}{}", e.getMessage(), e.getStackTrace()); log.error("项目本次获取获取出错或超时{}{}", e.getMessage(), e.getStackTrace());
return false; return false;
} }
return true; return true;
......
...@@ -30,7 +30,7 @@ public class ES4RedisThreadNew extends Thread { ...@@ -30,7 +30,7 @@ public class ES4RedisThreadNew extends Thread {
// 单个平台单个关键词组每次查询数量 // 单个平台单个关键词组每次查询数量
private static final int count = 300; private static final int count = 300;
// private static final int count = 50; // private static final int count = 10;
// private static final int max_Thread_num = 40; // private static final int max_Thread_num = 40;
// private static int Thread_num = 0; // private static int Thread_num = 0;
......
...@@ -24,7 +24,7 @@ public class RedisConfig { ...@@ -24,7 +24,7 @@ public class RedisConfig {
private String ip; private String ip;
private int port; private int port;
private String password; private String password;
public static String DIRECTKEY; public static String DIRECTKEY = "Direct:";
private int keyMaxSize; private int keyMaxSize;
......
...@@ -868,11 +868,13 @@ public class ESDaoImpl implements ESDao { ...@@ -868,11 +868,13 @@ public class ESDaoImpl implements ESDao {
fieldlist); fieldlist);
queryBuilder.must(keywordQueryBuilder); queryBuilder.must(keywordQueryBuilder);
} }
// 二阶段组装渠道 // 二阶段组装大类渠道
// 组装type // 组装type
List<String> typeList = ESQueryUtil.getDirectInsideTypeListbyPt(pt); List<String> typeList = ESQueryUtil.getDirectInsideTypeListbyPt(pt);
BoolQueryBuilder tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(QueryBuilders.boolQuery(), typeList, BoolQueryBuilder tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(QueryBuilders.boolQuery(), typeList,
null, "type"); null, "type");
queryBuilder.must(tsuQueryBuilder);
// 三阶段组装细类渠道
// 组装source和特征属性 // 组装source和特征属性
List<String> sourceList = new ArrayList<>(); List<String> sourceList = new ArrayList<>();
List<String> specTypeList = new ArrayList<>(); List<String> specTypeList = new ArrayList<>();
...@@ -887,22 +889,23 @@ public class ESDaoImpl implements ESDao { ...@@ -887,22 +889,23 @@ public class ESDaoImpl implements ESDao {
specTypeList.add(memberSpec); specTypeList.add(memberSpec);
} }
} }
BoolQueryBuilder sonQueryBuilder =QueryBuilders.boolQuery();
if (pt.equals("微博")) { if (pt.equals("微博")) {
tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder, sourceList, null, "username"); sonQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(sonQueryBuilder, sourceList, null, "username");
tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder, specTypeList, null, "user_id"); sonQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(sonQueryBuilder, specTypeList, null, "user_id");
} else { } else {
tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder, sourceList, null, "source"); sonQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(sonQueryBuilder, sourceList, null, "source");
if (pt.equals("微信")) { if (pt.equals("微信")) {
// tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder, // tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder,
// specTypeList, null, "source"); // specTypeList, null, "source");
} else if (pt.equals("今日头条")) { } else if (pt.equals("今日头条")) {
tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder, specTypeList, null, sonQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(sonQueryBuilder, specTypeList, null,
"user_id"); "user_id");
} else { } else {
tsuQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(tsuQueryBuilder, specTypeList, null, "url"); sonQueryBuilder = ESQueryUtil.assembleShouldNotFieldsQuery(sonQueryBuilder, specTypeList, null, "url");
} }
} }
queryBuilder.must(tsuQueryBuilder); queryBuilder.must(sonQueryBuilder);
// // 四阶段插件过滤 // // 四阶段插件过滤
// BoolQueryBuilder pluginQueryBuilder = ESQueryUtil.assemblePluginQurey(QueryBuilders.boolQuery()); // BoolQueryBuilder pluginQueryBuilder = ESQueryUtil.assemblePluginQurey(QueryBuilders.boolQuery());
// queryBuilder.must(pluginQueryBuilder); // queryBuilder.must(pluginQueryBuilder);
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
#spring.data.mongodb.uri=115.236.59.91:27017 #spring.data.mongodb.uri=115.236.59.91:27017
#\u5185\u7F6Etomcat\u7AEF\u53E3\u53F7 #\u5185\u7F6Etomcat\u7AEF\u53E3\u53F7
server.port=8092 server.port=8091
#\uFFFD\u009C\u008D\uFFFD\u008A\uFFFD\u7AEF\uFFFD\u0095\uFFFD\uFFFD\u008D\uFFFD\uFFFD\u0093 #\uFFFD\u009C\u008D\uFFFD\u008A\uFFFD\u7AEF\uFFFD\u0095\uFFFD\uFFFD\u008D\uFFFD\uFFFD\u0093
spring.data.mongodb.primary.database=qbjcPhoenix spring.data.mongodb.primary.database=qbjcPhoenix
......
...@@ -12,6 +12,7 @@ redis.port=6380 ...@@ -12,6 +12,7 @@ redis.port=6380
#redis.password=fjouero&^%^%^$*()*)))*^$$KDFJDKJF9ruorudlfdljfldjf #redis.password=fjouero&^%^%^$*()*)))*^$$KDFJDKJF9ruorudlfdljfldjf
redis.keyMaxSize=5000 redis.keyMaxSize=5000
redis.DIRECTKEY=Direct:
redis.selectDB=13 redis.selectDB=13
#redis.selectDB=2 #redis.selectDB=2
......
...@@ -12,6 +12,7 @@ redis.port=6380 ...@@ -12,6 +12,7 @@ redis.port=6380
#redis.password=fjouero&^%^%^$*()*)))*^$$KDFJDKJF9ruorudlfdljfldjf #redis.password=fjouero&^%^%^$*()*)))*^$$KDFJDKJF9ruorudlfdljfldjf
redis.keyMaxSize=5000 redis.keyMaxSize=5000
redis.DIRECTKEY=Direct:
redis.selectDB=12 redis.selectDB=12
#redis.selectDB=2 #redis.selectDB=2
......
...@@ -12,6 +12,7 @@ redis.port=6379 ...@@ -12,6 +12,7 @@ redis.port=6379
#redis.password=fjouero&^%^%^$*()*)))*^$$KDFJDKJF9ruorudlfdljfldjf #redis.password=fjouero&^%^%^$*()*)))*^$$KDFJDKJF9ruorudlfdljfldjf
redis.keyMaxSize=5000 redis.keyMaxSize=5000
redis.DIRECTKEY=Direct:
redis.selectDB=12 redis.selectDB=12
#redis.selectDB=2 #redis.selectDB=2
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment