Commit 5dc403c4 by admin

数据入库数量该从es

parent 1d624ef2
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.zhiwei</groupId>
......@@ -11,6 +12,11 @@
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.2.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
......
......@@ -9,7 +9,7 @@ public interface ServerDao {
public void insert(ServerBean serverBean);
public List<ServerBean> findDb(int pageNo,int pageSize,String dbName);
public List<ServerBean> findDb(int pageNo, int pageSize, String dbName);
public boolean update(ServerBean serverBean);
......@@ -17,7 +17,9 @@ public interface ServerDao {
public void findPt(String pt);
public List<Channel> findAllChannels(int pageNo,int pageSize);
public List<Channel> findChannels();
public List<Channel> findAllChannels(int pageNo, int pageSize);
public void insertChannels(Channel channels);
......@@ -33,5 +35,4 @@ public interface ServerDao {
public List<String> findDisChan();
}
......@@ -98,6 +98,11 @@ public class ServerDaoImpl implements ServerDao {
List<Channel> list = mongo.find(query, Channel.class);
return list;
}
@Override
public List<Channel> findChannels() {
List<Channel> list = mongo.find(null,Channel.class);
return list;
}
@Override
public void insertChannels(Channel channel) {
......
package com.zhiwei.manage.handle;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
......@@ -11,15 +10,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Component;
import com.zhiwei.manage.bean.Channel;
import com.zhiwei.manage.bean.CrawTemplate;
import com.zhiwei.manage.bean.Data;
import com.zhiwei.manage.bean.FieldIntegerity;
import com.zhiwei.manage.bean.Message;
......@@ -31,6 +27,7 @@ import com.zhiwei.manage.service.CrawTemplateServiceImpl;
import com.zhiwei.manage.service.DataServiceImpl;
import com.zhiwei.manage.service.MessageServiceImpl;
import com.zhiwei.manage.service.ServerServiceImpl;
import com.zhiwei.manage.util.ESUtil;
import com.zhiwei.manage.util.TimeUtil;
import com.zhiwei.manage.util.Tools;
......@@ -40,7 +37,8 @@ public class MainThread extends Thread {
private static final Log log = LogFactory.getLog(MainThread.class);
public static Map<String, Template> mainMap = new ConcurrentHashMap<String, Template>();
public static Map<String, Template> allTmp = new ConcurrentHashMap<String, Template>();
private static String today = "";
public static Map<String,String> map=new HashMap<>();
static ESUtil es=new ESUtil();
public MainThread() {
this.start();
......@@ -156,34 +154,21 @@ public class MainThread extends Thread {
ServerServiceImpl serverService = BeanFactory.getBean(ServerServiceImpl.class);
MessageServiceImpl msgService = BeanFactory.getBean(MessageServiceImpl.class);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd");
List<Channel> channel = serverService.findDisChannel();
while (true) {
if (sdf.format(new Date()).equals(today)) {
} else {
today = sdf.format(new Date());
}
Calendar start = Calendar.getInstance();
if (start.get(Calendar.HOUR_OF_DAY) < 1) {
Calendar end = Calendar.getInstance();
try {
// start.setTime(sdf2.parse(sdf2.format(start.getTime())));
// if (start.get(Calendar.HOUR_OF_DAY) > 1) {
// start.add(Calendar.HOUR_OF_DAY, -1);
// } else {
// d.setHours(d.getHours() - 1);
start.setTime(sdf.parse(today));
// }
start.add(Calendar.DATE, -1);
end.setTime(sdf2.parse(today));
} catch (ParseException e2) {
e2.printStackTrace();
}
try {
start.setTime(sdf.parse(sdf.format(start.getTime())));
end.setTime(sdf.parse(sdf.format(end.getTime())));
int count = 0;
List<Channel> channel = serverService.findDisChannel();
channel = serverService.findDisChannel();
for (Channel chan : channel) {
map.put(chan.getPt(), chan.getValue());
try {
count = (int) dataService.finCountByDayData(start.getTime(), end.getTime(), chan.getPt());
count = es.getCountByPt(start.getTime(), end.getTime(), chan.getPt(),chan.getValue());
System.out.println(count);
Data data = new Data();
data.setCount(count);
......@@ -202,7 +187,7 @@ public class MainThread extends Thread {
if (tmp.getSpyderInfoId() != null) {
int cwCount = 0;
String pt = crawService.findPt(tmp.getSpyderInfoId());
cwCount = (int) dataService.finCountByDayData(start.getTime(), end.getTime(), tmp);
cwCount = (int) es.getCounts(start.getTime(), end.getTime(), pt,map.get(pt),tmp.getSpyderInfoId());
try {
Data data = new Data();
data.setCount(cwCount);
......
......@@ -16,7 +16,6 @@ public interface ServerService {
public boolean delete(List<String> id);
public void insertChannels(Channel channels);
public void findPt(String pt);
......@@ -25,7 +24,9 @@ public interface ServerService {
public boolean deleteChannel(List<String> id);
public PageEty findChannel(int pageNo,int pageSize);
public List<Channel> findChannels();
public PageEty findChannel(int pageNo, int pageSize);
public List<Channel> findDisChannel();
......
......@@ -97,4 +97,10 @@ public class ServerServiceImpl implements ServerService {
return sd.findDisChan();
}
@Override
public List<Channel> findChannels() {
// TODO Auto-generated method stub
return null;
}
}
......@@ -7,7 +7,7 @@ import java.util.Properties;
public class Config {
public static Properties p=new Properties();
static {
String fileurl = Config.class.getClassLoader().getResource("data.properties").getPath();
String fileurl = Config.class.getClassLoader().getResource("mongo.properties").getPath();
p = new Properties();
try {
InputStreamReader in = new InputStreamReader(new FileInputStream(fileurl),"utf-8");
......
package com.zhiwei.manage.util;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
/**
* es连接类
*
* @author shenjinzhu
*
*/
public class ESClient {
private static String esIp;
private static int esPort;
private static String clusterName;
private static class ESClientHolder {
static TransportClient client = initESClient();
private static TransportClient initESClient() {
esIp=Config.getVal("ESIP");
clusterName=Config.getVal("clusterName");
esPort=Integer.valueOf(Config.getVal("ESPort"));
Settings esSettings = Settings.builder().put("cluster.name", clusterName) // 设置ES实例的名称
// 自动嗅探整个集群的状态,把集群中其他ES节点的ip添加到本地的客户端列表中
.put("client.transport.sniff", false).build();
TransportClient client = new PreBuiltTransportClient(esSettings);
try {
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esIp), esPort));
} catch (UnknownHostException e) {
e.printStackTrace();
}
return client;
}
}
public static TransportClient getInstance() {
return ESClientHolder.client;
}
}
package com.zhiwei.manage.util;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import com.zhiwei.manage.bean.Data;
import com.zhiwei.manage.handle.MainThread;
/**
* es工具类
*
* @author admin
*/
@Component
public class ESUtil {
private static final Logger log = LoggerFactory.getLogger(ESUtil.class);
static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
/**
* 获取匹配时终止时间
*
* @param hour
* @return
*/
public static String getTimeMax(int hour) {
SimpleDateFormat smf = new SimpleDateFormat("yyyy-MM-dd");
Date date = new Date();
String time = "";
time = smf.format(date) + "T0" + (hour - 8) + ":00:00.000Z";
return time;
}
/**
* 数据查询。获取数量
* @param start
* @param end
* @param pt
* @param type
* @param value
* @param spyderInfoId
* @return
*/
public int getDatas(Date start, Date end, String pt,String type, String value,String spyderInfoId){
Calendar startTime = Calendar.getInstance();
startTime.setTime(start);
startTime.add(Calendar.HOUR, -8);
Calendar endTime = Calendar.getInstance();
endTime.setTime(end);
endTime.add(Calendar.HOUR, -8);
try {
Client esClient = ESClient.getInstance();
// 搜索数据
SearchRequestBuilder srb = esClient.prepareSearch(Config.getVal(MainThread.map.get(pt) + ".indexName"));
BoolQueryBuilder bb = new BoolQueryBuilder();
srb.setQuery(bb);
addQuerys(pt, startTime, endTime, srb, bb);
QueryBuilder matchQuery = QueryBuilders.matchPhraseQuery(type, value);
bb.should(matchQuery);
srb.setSize(1);
SearchResponse response = srb.execute().actionGet();
SearchHits searchHits = response.getHits();
return (int) searchHits.getTotalHits();
} catch (Exception e) {
log.error("es检索出错,错误信息【{}】", e);
}
return 0;
}
public int getCounts(Date start, Date end, String pt, String value,String spid){
Calendar startTime = Calendar.getInstance();
startTime.setTime(start);
startTime.add(Calendar.HOUR, -8);
Calendar endTime = Calendar.getInstance();
endTime.setTime(end);
endTime.add(Calendar.HOUR, -8);
try {
Client esClient = ESClient.getInstance();
// 搜索数据
System.out.println(pt);
System.out.println(Config.getVal(MainThread.map.get(pt) + ".indexName"));
SearchRequestBuilder srb = esClient.prepareSearch(Config.getVal(MainThread.map.get(pt) + ".indexName"));
BoolQueryBuilder bb = new BoolQueryBuilder();
srb.setQuery(bb);
addQuerys(pt, startTime, endTime, srb, bb);
QueryBuilder matchQuery = QueryBuilders.matchPhraseQuery("spyderInfoId", spid);
bb.should(matchQuery);
srb.setSize(1);
SearchResponse response = srb.execute().actionGet();
SearchHits searchHits = response.getHits();
return (int) searchHits.getTotalHits();
} catch (Exception e) {
log.error("es检索出错,错误信息【{}】", e);
}
return 0;
}
/**
* 获取数量
* @param start
* @param end
* @param pt
* @param value
* @return
*/
public int getCountByPt(Date start, Date end, String pt, String value) {
System.out.println(value);
Calendar startTime = Calendar.getInstance();
startTime.setTime(start);
startTime.add(Calendar.HOUR, -8);
Calendar endTime = Calendar.getInstance();
endTime.setTime(end);
endTime.add(Calendar.HOUR, -8);
try {
Client esClient = ESClient.getInstance();
// 搜索数据
SearchRequestBuilder srb = esClient.prepareSearch(Config.getVal(value + ".indexName"));
BoolQueryBuilder bb = new BoolQueryBuilder();
srb.setQuery(bb);
addQuerys(pt, startTime, endTime, srb, bb);
srb.setSize(1);
SearchResponse response = srb.execute().actionGet();
SearchHits searchHits = response.getHits();
return (int) searchHits.getTotalHits();
} catch (Exception e) {
log.error("es检索出错,错误信息【{}】", e);
}
return 0;
}
private void addQuerys(String pt, Calendar startTime, Calendar endTime, SearchRequestBuilder srb,
BoolQueryBuilder bb) {
if (pt.equals("微博")) {
srb.setPostFilter(
QueryBuilders.rangeQuery("ins").gte(sdf.format(startTime.getTime()) + "T00:00:00.000Z")
.lt(sdf.format(endTime.getTime()) + "T00:00:00.000Z"));
} else if (pt.equals("知乎")) {
srb.setPostFilter(
QueryBuilders.rangeQuery("insert_at").gte(sdf.format(startTime.getTime()) + "T00:00:00.000Z")
.lt(sdf.format(endTime.getTime()) + "T00:00:00.000Z"));
} else if (pt.equals("微信全量")) {
srb.setPostFilter(
QueryBuilders.rangeQuery("time").gte(sdf.format(startTime.getTime()) + "T00:00:00.000Z")
.lt(sdf.format(endTime.getTime()) + "T00:00:00.000Z"));
} else {
srb.setPostFilter(
QueryBuilders.rangeQuery("time").gte(sdf.format(startTime.getTime()) + "T00:00:00.000Z")
.lt(sdf.format(endTime.getTime()) + "T00:00:00.000Z"));
QueryBuilder matchQuery = QueryBuilders.matchPhraseQuery("pt", pt);
bb.should(matchQuery);
}
}
public static List<String> getNewsForWechatWithMechanism(Date start, Date end) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd日HH:mm:ss");
Calendar startTime = Calendar.getInstance();
// startTime.setTime(start);
// startTime.add(Calendar.HOUR, -8);
// Calendar endTime = Calendar.getInstance();
// endTime.setTime(end);
// endTime.add(Calendar.HOUR, -8);
try {
log.info("检索开始");
Client esClient = ESClient.getInstance();
// 搜索数据
SearchRequestBuilder srb = esClient.prepareSearch(Config.getVal("weibo.indexName"));
BoolQueryBuilder bb = new BoolQueryBuilder();
// QueryBuilder matchQuery2 = QueryBuilders.matchPhraseQuery("pt", "");
// bb.should(matchQuery2);
srb.setQuery(bb);
srb.setPostFilter(
QueryBuilders.rangeQuery("ins").gte("2018-04-17T00:00:00.000Z").lt("2018-04-18T00:00:00.000Z"));
srb.setSize(1);
SearchResponse response = srb.execute().actionGet();
SearchHits searchHits = response.getHits();
System.out.println(searchHits.getTotalHits());
for (SearchHit hit : searchHits.getHits()) {
Map<String, Object> map = hit.getSource();
}
log.info("结束");
} catch (Exception e) {
log.error("es检索出错,错误信息【{}】", e);
}
return null;
}
}
###生产环境
#权限认证
###生产环境
#权限认证
mongo.serverMongoIp=115.236.59.91
mongo.serverMongoPort=27017
mongo.serverMongoIp2=1.119.44.206
......@@ -15,3 +15,17 @@ mongo.spyPlatData.dbName1=dataMonitoring
mongo.spyPlatData.dbName2=mediaspider
##############################################
mongo.spyPlatData.dbName3=NetWork
##############################################
ESIP=1.119.44.201
ESPort=9300
clusterName=Media-University
##网媒、微信、贴吧、论坛
media.indexName=mediaspider*
weibo.indexName=network*
wx_tanglihua.indexName=wx_tanglihua*
zhihu.indexName=zhihu*
video.indexName=videos*
pingmei.indexName=mediaspider*
luntan.indexName=mediaspider*
tieba.indexName=mediaspider*
wechat.indexName=mediaspider*
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment