Commit a27ef8a3 by admin

添加了渠道低数据量预警

parent 5dc403c4
2018-04-19 10:51:42,449 [main] [com.zhiwei.manage.util.ESUtil] [ERROR] - es检索出错,错误信息【{}】
java.lang.NullPointerException: index must not be null
at java.util.Objects.requireNonNull(Unknown Source)
at org.elasticsearch.action.search.SearchRequest.indices(SearchRequest.java:114)
at org.elasticsearch.action.search.SearchRequestBuilder.setIndices(SearchRequestBuilder.java:56)
at org.elasticsearch.client.support.AbstractClient.prepareSearch(AbstractClient.java:535)
at com.zhiwei.manage.util.ESUtil.getCountByPt(ESUtil.java:138)
at com.zhiwei.manage.util.ESUtil.main(ESUtil.java:88)
2018-04-19 10:51:56,692 [main] [com.zhiwei.manage.util.ESUtil] [ERROR] - es检索出错,错误信息【{}】
java.lang.NullPointerException: index must not be null
at java.util.Objects.requireNonNull(Unknown Source)
at org.elasticsearch.action.search.SearchRequest.indices(SearchRequest.java:114)
at org.elasticsearch.action.search.SearchRequestBuilder.setIndices(SearchRequestBuilder.java:56)
at org.elasticsearch.client.support.AbstractClient.prepareSearch(AbstractClient.java:535)
at com.zhiwei.manage.util.ESUtil.getCountByPt(ESUtil.java:138)
at com.zhiwei.manage.util.ESUtil.main(ESUtil.java:88)
......@@ -103,7 +103,7 @@ public class DataDaoImpl implements DataDao {
if (d == null) {
mongo.insert(data, "Data");
} else {
d.setCount(data.getCount());
d.setCount(data.getCount()+d.getCount());
Update update = new Update();
Field[] fields = d.getClass().getDeclaredFields();
for (int j = 0; j < fields.length; j++) {
......
......@@ -19,6 +19,7 @@ import com.zhiwei.manage.bean.Channel;
import com.zhiwei.manage.bean.Data;
import com.zhiwei.manage.bean.FieldIntegerity;
import com.zhiwei.manage.bean.Message;
import com.zhiwei.manage.bean.Messages;
import com.zhiwei.manage.bean.NewsDelayed;
import com.zhiwei.manage.bean.PingUrl;
import com.zhiwei.manage.bean.Template;
......@@ -27,9 +28,10 @@ import com.zhiwei.manage.service.CrawTemplateServiceImpl;
import com.zhiwei.manage.service.DataServiceImpl;
import com.zhiwei.manage.service.MessageServiceImpl;
import com.zhiwei.manage.service.ServerServiceImpl;
import com.zhiwei.manage.util.Config;
import com.zhiwei.manage.util.ESUtil;
import com.zhiwei.manage.util.SendMailUtil;
import com.zhiwei.manage.util.TimeUtil;
import com.zhiwei.manage.util.Tools;
@Component
public class MainThread extends Thread {
......@@ -37,8 +39,16 @@ public class MainThread extends Thread {
private static final Log log = LogFactory.getLog(MainThread.class);
public static Map<String, Template> mainMap = new ConcurrentHashMap<String, Template>();
public static Map<String, Template> allTmp = new ConcurrentHashMap<String, Template>();
public static Map<String,String> map=new HashMap<>();
static ESUtil es=new ESUtil();
public static Map<String, String> map = new HashMap<>();
static ESUtil es = new ESUtil();
static Map<String,Integer> counts=new HashMap<>();
static {
counts.put("网媒", 1200);
counts.put("微博", 2000);
counts.put("微信", 3500);
counts.put("知乎", 300);
// counts.put("平媒", 300);
}
public MainThread() {
this.start();
......@@ -153,70 +163,67 @@ public class MainThread extends Thread {
DataServiceImpl dataService = BeanFactory.getBean(DataServiceImpl.class);
ServerServiceImpl serverService = BeanFactory.getBean(ServerServiceImpl.class);
MessageServiceImpl msgService = BeanFactory.getBean(MessageServiceImpl.class);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH");
List<Channel> channel = serverService.findDisChannel();
while (true) {
Calendar start = Calendar.getInstance();
if (start.get(Calendar.HOUR_OF_DAY) < 1) {
Calendar end = Calendar.getInstance();
try {
start.add(Calendar.DATE, -1);
start.setTime(sdf.parse(sdf.format(start.getTime())));
end.setTime(sdf.parse(sdf.format(end.getTime())));
int count = 0;
channel = serverService.findDisChannel();
for (Channel chan : channel) {
map.put(chan.getPt(), chan.getValue());
Calendar end = Calendar.getInstance();
try {
start.add(Calendar.HOUR_OF_DAY, -1);
start.setTime(sdf.parse(sdf.format(start.getTime())));
end.setTime(sdf.parse(sdf.format(end.getTime())));
int count = 0;
channel = serverService.findDisChannel();
for (Channel chan : channel) {
map.put(chan.getPt(), chan.getValue());
try {
count = es.getCountByPt(start.getTime(), end.getTime(), chan.getPt(), chan.getValue());
System.out.println(count);
Data data = new Data();
data.setCount(count);
data.setPt(chan.getPt());
data.setTime(start.getTime());
dataService.insert(data);
log.info(sdf.format(start.getTime()) + "|" + chan.getPt() + "|入库");
if(count<counts.get(chan.getPt())) {
Messages m=new Messages();
m.setTitle("渠道低数据量预警{"+chan.getPt()+"}");
m.setContent("1小时内的数据量为:"+count);
SendMailUtil.sendMessage(Config.getVal("count_mail"), m);
}
} catch (Exception e) {
log.error(chan.getPt() + "入库出错,错误信息{}", e);
}
}
Set<String> keys = allTmp.keySet();
for (Iterator<String> it = keys.iterator(); it.hasNext();) {
String key = it.next();
Template tmp = allTmp.get(key);
if (tmp.getSpyderInfoId() != null) {
int cwCount = 0;
String pt = crawService.findPt(tmp.getSpyderInfoId());
cwCount = (int) es.getCounts(start.getTime(), end.getTime(), pt, map.get(pt),
tmp.getSpyderInfoId());
try {
count = es.getCountByPt(start.getTime(), end.getTime(), chan.getPt(),chan.getValue());
System.out.println(count);
Data data = new Data();
data.setCount(count);
data.setPt(chan.getPt());
data.setCount(cwCount);
data.setTime(start.getTime());
data.setTempName(tmp.getTempName());
dataService.insert(data);
log.info(sdf.format(start.getTime()) + "|" + chan.getPt() + "|入库");
log.info(TimeUtil.yearToDay(start.getTime()) + "|" + tmp.getTempName() + "|入库");
} catch (Exception e) {
log.error(chan.getPt() + "入库出错,错误信息{}", e);
log.error(tmp.getTempName() + "入库出错,错误信息{}", e);
}
}
Set<String> keys = allTmp.keySet();
for (Iterator<String> it = keys.iterator(); it.hasNext();) {
String key = it.next();
Template tmp = allTmp.get(key);
if (tmp.getSpyderInfoId() != null) {
int cwCount = 0;
String pt = crawService.findPt(tmp.getSpyderInfoId());
cwCount = (int) es.getCounts(start.getTime(), end.getTime(), pt,map.get(pt),tmp.getSpyderInfoId());
try {
Data data = new Data();
data.setCount(cwCount);
data.setTime(start.getTime());
data.setTempName(tmp.getTempName());
dataService.insert(data);
log.info(TimeUtil.yearToDay(start.getTime()) + "|" + tmp.getTempName() + "|入库");
} catch (Exception e) {
log.error(tmp.getTempName() + "入库出错,错误信息{}", e);
}
if (cwCount < tmp.getNewsNumber()) {
Message m = new Message();
m.setHandle(false);
m.setCreateDate(start.getTime());
m.setTemplateLv(tmp.getTemplateLv());
m.setTempName(tmp.getTempName());
m.setErrorType("count");
m.setPt(pt);
m.setErrorMsg("数据量检测:" + tmp.getTempName() + "的数据为小于" + tmp.getNewsNumber());
msgService.insert(m);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
Tools.sleep(1000 * 60 * 60);
} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(1000 * 3600);
} catch (InterruptedException e) {
e.printStackTrace();
}
Tools.sleep(1000 * 5 * 60);
}
}
......
package com.zhiwei.manage.util;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
......@@ -11,6 +12,7 @@ import java.util.Map;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.recycler.Recycler.C;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
......@@ -32,6 +34,7 @@ import com.zhiwei.manage.handle.MainThread;
public class ESUtil {
private static final Logger log = LoggerFactory.getLogger(ESUtil.class);
static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
/**
* 获取匹配时终止时间
*
......@@ -45,9 +48,10 @@ public class ESUtil {
time = smf.format(date) + "T0" + (hour - 8) + ":00:00.000Z";
return time;
}
/**
* 数据查询。获取数量
*
* @param start
* @param end
* @param pt
......@@ -56,7 +60,7 @@ public class ESUtil {
* @param spyderInfoId
* @return
*/
public int getDatas(Date start, Date end, String pt,String type, String value,String spyderInfoId){
public int getDatas(Date start, Date end, String pt, String type, String value, String spyderInfoId) {
Calendar startTime = Calendar.getInstance();
startTime.setTime(start);
startTime.add(Calendar.HOUR, -8);
......@@ -81,8 +85,13 @@ public class ESUtil {
}
return 0;
}
public int getCounts(Date start, Date end, String pt, String value,String spid){
public static void main(String[] args) throws ParseException {
System.out.println(new ESUtil().getCountByPt(sdf.parse("2018-04-19"), new Date(), "网媒", "media"));
}
public int getCounts(Date start, Date end, String pt, String value, String spid) {
Calendar startTime = Calendar.getInstance();
startTime.setTime(start);
startTime.add(Calendar.HOUR, -8);
......@@ -91,7 +100,6 @@ public class ESUtil {
endTime.add(Calendar.HOUR, -8);
try {
Client esClient = ESClient.getInstance();
// 搜索数据
System.out.println(pt);
System.out.println(Config.getVal(MainThread.map.get(pt) + ".indexName"));
......@@ -113,6 +121,7 @@ public class ESUtil {
/**
* 获取数量
*
* @param start
* @param end
* @param pt
......@@ -120,7 +129,6 @@ public class ESUtil {
* @return
*/
public int getCountByPt(Date start, Date end, String pt, String value) {
System.out.println(value);
Calendar startTime = Calendar.getInstance();
startTime.setTime(start);
startTime.add(Calendar.HOUR, -8);
......@@ -147,26 +155,25 @@ public class ESUtil {
private void addQuerys(String pt, Calendar startTime, Calendar endTime, SearchRequestBuilder srb,
BoolQueryBuilder bb) {
if (pt.equals("微博")) {
srb.setPostFilter(
QueryBuilders.rangeQuery("ins").gte(sdf.format(startTime.getTime()) + "T00:00:00.000Z")
.lt(sdf.format(endTime.getTime()) + "T00:00:00.000Z"));
srb.setPostFilter(QueryBuilders.rangeQuery("ins").gte(sendTime(startTime)).lt(sendTime(endTime)));
} else if (pt.equals("知乎")) {
srb.setPostFilter(
QueryBuilders.rangeQuery("insert_at").gte(sdf.format(startTime.getTime()) + "T00:00:00.000Z")
.lt(sdf.format(endTime.getTime()) + "T00:00:00.000Z"));
srb.setPostFilter(QueryBuilders.rangeQuery("insert_at").gte(sendTime(startTime)).lt(sendTime(endTime)));
} else if (pt.equals("微信全量")) {
srb.setPostFilter(
QueryBuilders.rangeQuery("time").gte(sdf.format(startTime.getTime()) + "T00:00:00.000Z")
.lt(sdf.format(endTime.getTime()) + "T00:00:00.000Z"));
srb.setPostFilter(QueryBuilders.rangeQuery("time").gte(sendTime(startTime)).lt(sendTime(endTime)));
} else {
srb.setPostFilter(
QueryBuilders.rangeQuery("time").gte(sdf.format(startTime.getTime()) + "T00:00:00.000Z")
.lt(sdf.format(endTime.getTime()) + "T00:00:00.000Z"));
srb.setPostFilter(QueryBuilders.rangeQuery("time").gte(sendTime(startTime)).lt(sendTime(endTime)));
QueryBuilder matchQuery = QueryBuilders.matchPhraseQuery("pt", pt);
bb.should(matchQuery);
}
}
public static String sendTime(Calendar time) {
return sdf.format(time.getTime()) + "T"
+ (time.get(Calendar.HOUR_OF_DAY) >= 10 ? time.get(Calendar.HOUR_OF_DAY) + ""
: "0" + time.get(Calendar.HOUR_OF_DAY))
+ ":00:00.000Z";
}
public static List<String> getNewsForWechatWithMechanism(Date start, Date end) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd日HH:mm:ss");
Calendar startTime = Calendar.getInstance();
......
......@@ -29,3 +29,5 @@ pingmei.indexName=mediaspider*
luntan.indexName=mediaspider*
tieba.indexName=mediaspider*
wechat.indexName=mediaspider*
###########################################
count_mail=shenjinzhu999@163.com
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment