Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
M
messageflow
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
虞诚毅
messageflow
Commits
22a3d9e4
Commit
22a3d9e4
authored
Oct 30, 2018
by
shentao
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
2018/10/30 消息流事件采集系统1.0版
parent
0044078e
Hide whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
386 additions
and
107 deletions
+386
-107
src/main/java/com/zhiwei/messageflow/ES4RedisRunner.java
+1
-1
src/main/java/com/zhiwei/messageflow/ES4RedisStart.java
+1
-1
src/main/java/com/zhiwei/messageflow/EVENTAutoMarkStart.java
+5
-3
src/main/java/com/zhiwei/messageflow/EVENTAutoMarkTask.java
+3
-5
src/main/java/com/zhiwei/messageflow/EVENTAutoMarkThread.java
+6
-12
src/main/java/com/zhiwei/messageflow/MessageflowApplication.java
+2
-0
src/main/java/com/zhiwei/messageflow/config/RedisConfig.java
+3
-0
src/main/java/com/zhiwei/messageflow/kafka/KafkaConsumer.java
+2
-2
src/main/java/com/zhiwei/messageflow/redis/RedisPoolAndTools.java
+78
-0
src/main/java/com/zhiwei/messageflow/redis/service/RedisService.java
+19
-0
src/main/java/com/zhiwei/messageflow/redis/service/impl/RedisServiceImpl.java
+24
-0
src/main/java/com/zhiwei/messageflow/service/EventService.java
+2
-1
src/main/java/com/zhiwei/messageflow/service/impl/EventServiceImpl.java
+215
-53
src/main/resources/application.properties
+4
-6
src/main/resources/application.properties.local1
+14
-15
src/main/resources/application.properties.local2
+3
-2
src/main/resources/application.properties.test10001
+4
-6
No files found.
src/main/java/com/zhiwei/messageflow/ES4RedisRunner.java
View file @
22a3d9e4
...
...
@@ -45,7 +45,7 @@ public class ES4RedisRunner implements ApplicationRunner {
// 手动注入bean ES4RedisStart
ES4RedisStart
start
=
ApplicationContextProvider
.
getBean
(
"ES4RedisStart"
,
ES4RedisStart
.
class
);
DES4RedisStart
directstart
=
ApplicationContextProvider
.
getBean
(
"DES4RedisStart"
,
DES4RedisStart
.
class
);
EVENT
4RedisStart
eventStart
=
ApplicationContextProvider
.
getBean
(
"EVENT4RedisStart"
,
EVENT4Redis
Start
.
class
);
EVENT
AutoMarkStart
eventStart
=
ApplicationContextProvider
.
getBean
(
"EVENTAutoMarkStart"
,
EVENTAutoMark
Start
.
class
);
// 定时器
Timer
timer
=
new
Timer
();
...
...
src/main/java/com/zhiwei/messageflow/ES4RedisStart.java
View file @
22a3d9e4
...
...
@@ -44,7 +44,7 @@ public class ES4RedisStart {
// 遍历项目
for
(
Project
project
:
projects
)
{
// if(!project.getProjectName().equals("
证监会
")) {
// if(!project.getProjectName().equals("
今日头条
")) {
// continue;
// }
...
...
src/main/java/com/zhiwei/messageflow/EVENT
4Redis
Start.java
→
src/main/java/com/zhiwei/messageflow/EVENT
AutoMark
Start.java
View file @
22a3d9e4
...
...
@@ -7,16 +7,17 @@ import java.util.List;
import
org.apache.logging.log4j.LogManager
;
import
org.apache.logging.log4j.Logger
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
com.fasterxml.jackson.core.JsonParseException
;
import
com.fasterxml.jackson.databind.JsonMappingException
;
import
com.zhiwei.messageflow.bean.Event
;
import
com.zhiwei.messageflow.mongo.bean.Project
;
import
com.zhiwei.messageflow.redis.service.RedisService
;
import
com.zhiwei.messageflow.service.EventService
;
public
class
EVENT4RedisStart
{
private
final
static
Logger
log
=
LogManager
.
getLogger
(
EVENT4RedisStart
.
class
);
@Component
public
class
EVENTAutoMarkStart
{
private
final
static
Logger
log
=
LogManager
.
getLogger
(
EVENTAutoMarkStart
.
class
);
@Autowired
protected
RedisService
redisService
;
...
...
@@ -24,6 +25,7 @@ public class EVENT4RedisStart {
@Autowired
private
EventService
eventService
;
@Autowired
private
EVENTAutoMarkTask
eventAutoMark
;
public
void
startThread
()
throws
JsonParseException
,
JsonMappingException
,
IOException
,
InterruptedException
{
...
...
src/main/java/com/zhiwei/messageflow/EVENTAutoMarkTask.java
View file @
22a3d9e4
package
com
.
zhiwei
.
messageflow
;
import
java.io.IOException
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
com.fasterxml.jackson.core.JsonParseException
;
import
com.fasterxml.jackson.databind.JsonMappingException
;
import
com.zhiwei.messageflow.bean.Event
;
import
com.zhiwei.messageflow.service.EventService
;
@Component
public
class
EVENTAutoMarkTask
{
@Autowired
private
EventService
eventService
;
public
boolean
eventCollectionAutoMark
(
Event
eventCollection
)
throws
JsonParseException
,
JsonMappingException
,
IO
Exception
{
public
boolean
eventCollectionAutoMark
(
Event
eventCollection
)
throws
Exception
{
return
eventService
.
autoMark
(
eventCollection
);
}
...
...
src/main/java/com/zhiwei/messageflow/EVENTAutoMarkThread.java
View file @
22a3d9e4
...
...
@@ -22,7 +22,7 @@ public class EVENTAutoMarkThread extends Thread{
private
EVENTAutoMarkTask
eventAutoMark
;
public
EVENTAutoMarkThread
(
Event
eventCollection
,
EVENTAutoMarkTask
eventAutoMark
2
)
{
public
EVENTAutoMarkThread
(
Event
eventCollection
,
EVENTAutoMarkTask
eventAutoMark
)
{
this
.
threadName
=
eventCollection
.
getId
();
this
.
eventCollection
=
eventCollection
;
this
.
eventAutoMark
=
eventAutoMark
;
...
...
@@ -53,27 +53,21 @@ public class EVENTAutoMarkThread extends Thread{
Thread
.
sleep
(
10L
);
// 程序运行
log
.
info
(
"Running 事件采集
进入自动标注
{}"
,
threadName
);
log
.
info
(
"Running 事件采集
自动标注中
{}"
,
threadName
);
// 该项目执行事件采集存储
boolean
flag
=
eventAutoMark
.
eventCollectionAutoMark
(
eventCollection
);
if
(!
flag
)
{
Thread
.
currentThread
().
interrupted
();
log
.
error
(
"{}事件采集
进入
自动标注出现异常,线程状态:{}"
,
threadName
,
Thread
.
currentThread
().
isInterrupted
());
log
.
error
(
"{}事件采集自动标注出现异常,线程状态:{}"
,
threadName
,
Thread
.
currentThread
().
isInterrupted
());
}
}
catch
(
JsonParseException
e
)
{
e
.
printStackTrace
();
}
catch
(
JsonMappingException
e
)
{
e
.
printStackTrace
();
}
catch
(
IOException
e
)
{
e
.
printStackTrace
();
}
catch
(
InterruptedException
e
)
{
e
.
printStackTrace
();
}
catch
(
Exception
e
)
{
log
.
error
(
"Thread {}事件采集有误:{} \n 位置{}."
,
threadName
,
e
,
e
.
getStackTrace
());
}
finally
{
// 线程退出
log
.
info
(
"Thread 事件采集
进入自动标注
{} exiting."
,
threadName
);
log
.
info
(
"Thread 事件采集
自动标注结束
{} exiting."
,
threadName
);
}
}
...
...
src/main/java/com/zhiwei/messageflow/MessageflowApplication.java
View file @
22a3d9e4
...
...
@@ -4,8 +4,10 @@ import org.springframework.boot.SpringApplication;
import
org.springframework.boot.autoconfigure.SpringBootApplication
;
import
org.springframework.boot.autoconfigure.data.mongo.MongoDataAutoConfiguration
;
import
org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration
;
import
org.springframework.kafka.annotation.EnableKafka
;
@SpringBootApplication
(
exclude
=
{
MongoAutoConfiguration
.
class
,
MongoDataAutoConfiguration
.
class
})
@EnableKafka
public
class
MessageflowApplication
{
public
static
void
main
(
String
[]
args
)
{
...
...
src/main/java/com/zhiwei/messageflow/config/RedisConfig.java
View file @
22a3d9e4
...
...
@@ -27,7 +27,10 @@ public class RedisConfig {
private
String
password
;
public
static
final
String
DIRECTKEY
=
"Direct:"
;
public
static
final
String
EVENTKEY
=
"Event:"
;
/** 事件采集待处理列表 **/
public
static
final
String
EVENTLISTKEY
=
"Event:EventList"
;
/** 事件采集去重url集 **/
public
static
final
String
EVENTHASHKEY
=
"Event:Hash:"
;
private
int
keyMaxSize
;
...
...
src/main/java/com/zhiwei/messageflow/kafka/KafkaConsumer.java
View file @
22a3d9e4
...
...
@@ -30,12 +30,12 @@ public class KafkaConsumer {
// L.i("message =" + message);
// }
// }
@KafkaListener
(
topics
=
{
"qbjc_event_topic"
},
containerFactory
=
"kafkaListenerContainerFactory"
)
@KafkaListener
(
topics
=
{
"qbjc_event_topic"
},
containerFactory
=
"kafkaListenerContainerFactory"
)
public
void
consumerMsg
(
List
<
ConsumerRecord
>
records
,
Acknowledgment
ack
)
{
try
{
// eventService.saveRecords(records);
for
(
ConsumerRecord
record
:
records
)
{
log
.
info
(
"record = {}"
,
record
);
//
log.info("record = {}" , record);
Optional
<?>
kafkaMessage
=
Optional
.
ofNullable
(
record
.
value
());
if
(
kafkaMessage
.
isPresent
())
{
Object
message
=
kafkaMessage
.
get
();
...
...
src/main/java/com/zhiwei/messageflow/redis/RedisPoolAndTools.java
View file @
22a3d9e4
...
...
@@ -320,5 +320,82 @@ public class RedisPoolAndTools {
jedis
.
lrem
(
key
,
0
,
id
);
returnResource
(
jedis
);
}
/**
* 判断key是否存在
* @Title: exists
* @Description: 判断key是否存在
* @param @param key
* @param @return 设定文件
* @return boolean 返回类型
*/
public
boolean
exists
(
String
key
)
{
Jedis
jedis
=
getJedis
();
while
(
true
)
{
if
(
null
!=
jedis
)
{
break
;
}
else
{
jedis
=
getJedis
();
}
}
boolean
res
=
jedis
.
exists
(
key
);
returnResource
(
jedis
);
return
res
;
}
/**
* 判断hash中key是否存在
* @Title: hexists
* @Description: 判断hash中key是否存在
* @param @param redisKey
* @param @param url
* @param @return 设定文件
* @return boolean 返回类型
*/
public
boolean
hexists
(
String
key
,
String
url
)
{
Jedis
jedis
=
getJedis
();
while
(
true
)
{
if
(
null
!=
jedis
)
{
break
;
}
else
{
jedis
=
getJedis
();
}
}
boolean
res
=
jedis
.
hexists
(
key
,
url
);
returnResource
(
jedis
);
return
res
;
}
/**
* 如果哈希表不存在,一个新的哈希表被创建并进行 HSET 操作。如果字段已经存在于哈希表中,旧值将被覆盖。
* @Title: hset
* @Description: 如果哈希表不存在,一个新的哈希表被创建并进行 HSET 操作。如果字段已经存在于哈希表中,旧值将被覆盖。
* @param @param redisKey
* @param @param url 设定文件
* @return void 返回类型
*/
public
void
hset
(
String
key
,
String
url
)
{
Jedis
jedis
=
getJedis
();
while
(
true
)
{
if
(
null
!=
jedis
)
{
break
;
}
else
{
jedis
=
getJedis
();
}
}
jedis
.
hset
(
key
,
url
,
url
);
returnResource
(
jedis
);
}
public
boolean
del
(
String
key
)
{
Jedis
jedis
=
getJedis
();
while
(
true
)
{
if
(
null
!=
jedis
)
{
break
;
}
else
{
jedis
=
getJedis
();
}
}
boolean
res
=
1
==
jedis
.
del
(
key
)?
true
:
false
;
returnResource
(
jedis
);
return
res
;
}
}
\ No newline at end of file
src/main/java/com/zhiwei/messageflow/redis/service/RedisService.java
View file @
22a3d9e4
...
...
@@ -170,5 +170,24 @@ public interface RedisService {
* @return boolean 返回类型
*/
boolean
removeEventAutoMarkList
(
String
id
);
/**
* 判断url是否存在
* @param id
* @Title: existsCollectionHashByUrlkey
* @Description: 判断url是否存在
* @param @param url
* @param @return 设定文件
* @return boolean 返回类型
*/
boolean
existsCollectionHashByUrlkey
(
String
url
,
String
id
);
/**
* 按事件采集id删除事件采集urlHash
* @Title: dropCollectionHash
* @Description: 按事件采集id删除事件采集urlHash
* @param @param id
* @param @return 设定文件
* @return boolean 返回类型
*/
boolean
dropCollectionHash
(
String
id
);
}
src/main/java/com/zhiwei/messageflow/redis/service/impl/RedisServiceImpl.java
View file @
22a3d9e4
...
...
@@ -213,4 +213,28 @@ public class RedisServiceImpl implements RedisService {
return
true
;
}
@Override
public
boolean
existsCollectionHashByUrlkey
(
String
url
,
String
id
)
{
String
redisKey
=
RedisConfig
.
EVENTHASHKEY
+
id
;
if
(
redisPoolAndTools
.
exists
(
redisKey
))
{
//判断是否有hash urlkey
if
(
redisPoolAndTools
.
hexists
(
redisKey
,
url
))
{
return
true
;
}
else
{
redisPoolAndTools
.
hset
(
redisKey
,
url
);
return
false
;
}
}
else
{
//新建hash
redisPoolAndTools
.
hset
(
redisKey
,
url
);
return
false
;
}
}
@Override
public
boolean
dropCollectionHash
(
String
id
)
{
String
redisKey
=
RedisConfig
.
EVENTHASHKEY
+
id
;
return
redisPoolAndTools
.
del
(
redisKey
);
}
}
src/main/java/com/zhiwei/messageflow/service/EventService.java
View file @
22a3d9e4
...
...
@@ -62,8 +62,9 @@ public interface EventService {
* @throws IOException
* @throws JsonMappingException
* @throws JsonParseException
* @throws Exception
*/
boolean
autoMark
(
Event
eventCollection
)
throws
JsonParseException
,
JsonMappingException
,
IOException
;
boolean
autoMark
(
Event
eventCollection
);
/**
* 更新事件等待采集列表中的前十个任务状态为采集完毕
* @Title: updateTopTenCollection
...
...
src/main/java/com/zhiwei/messageflow/service/impl/EventServiceImpl.java
View file @
22a3d9e4
package
com
.
zhiwei
.
messageflow
.
service
.
impl
;
import
java.io.IOException
;
import
java.net.MalformedURLException
;
import
java.net.URL
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Objects
;
import
java.util.Optional
;
import
java.util.Set
;
...
...
@@ -27,6 +30,7 @@ import com.fasterxml.jackson.core.JsonParseException;
import
com.fasterxml.jackson.core.JsonProcessingException
;
import
com.fasterxml.jackson.databind.JsonMappingException
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.google.common.net.InternetDomainName
;
import
com.mongodb.BasicDBObject
;
import
com.mongodb.DBObject
;
import
com.zhiwei.messageflow.bean.Event
;
...
...
@@ -42,7 +46,7 @@ public class EventServiceImpl implements EventService {
private
static
final
Logger
log
=
LogManager
.
getLogger
(
EventServiceImpl
.
class
);
private
static
AutomaticMarkClient
client
=
AutomaticMarkClient
.
getClient
(
MiddlewareConfig
.
zookeeperIp
);
/** 自动标注每次进聚合条数 **/
private
static
final
int
AUTOMARKPAGELIMIT
=
500
;
@Autowired
...
...
@@ -57,24 +61,23 @@ public class EventServiceImpl implements EventService {
@Override
public
void
saveEventMsg
(
Object
message
,
long
offset
)
{
// JSONObject.toJSONString(message);
// System.out.println(JSONObject.toJSONString(message));
JSONObject
msgJsonOb
=
JSONObject
.
parseObject
(
message
.
toString
());
if
(
"over"
.
equals
(
msgJsonOb
.
getString
(
"result"
)))
{
System
.
err
.
println
(
msgJsonOb
.
getString
(
"eventId"
));
// TODO 更新事件状态
// 更新事件状态并清除url去重集
dropUrlIterateList
(
msgJsonOb
);
overEvent
(
msgJsonOb
.
getString
(
"eventId"
));
try
{
Thread
.
sleep
(
4000L
);
}
catch
(
InterruptedException
e
)
{
// TODO Auto-generated catch block
e
.
printStackTrace
();
}
}
else
{
if
(
msgJsonOb
.
containsKey
(
"time"
))
{
if
(
null
!=
msgJsonOb
.
getLong
(
"time"
))
{
// 放入缓存
if
(
CanMsgInsert
(
msgJsonOb
))
{
if
(!
isUrlIterate
(
msgJsonOb
))
{
// url不重复
msgJsonOb
.
put
(
"offset"
,
offset
);
// 放入缓存
redisService
.
insertEvent
(
msgJsonOb
);
}
}
...
...
@@ -82,6 +85,150 @@ public class EventServiceImpl implements EventService {
}
/**
* 采集结束删除urlHash集
*
* @Title: dropUrlIterateList
* @Description: 采集结束删除urlHash集
* @param @param
* msgJsonOb
* @param @return
* 设定文件
* @return boolean 返回类型
*/
private
boolean
dropUrlIterateList
(
JSONObject
msgJsonOb
)
{
String
id
=
msgJsonOb
.
getString
(
"eventId"
);
return
redisService
.
dropCollectionHash
(
id
);
}
// 规则
// 1、sourec为“百度网页
// 2、字段格式错误并影响最终数据存储 “例:时间”
// 3、重复数据(URL)“https跟http的统一”
/**
* 判断重复url
*
* @Title: isUrlIterate
* @Description: 判断重复url
* @param @param
* string
* @param @param
* id
* @param @return
* 设定文件
* @return boolean 返回类型
*/
private
boolean
isUrlIterate
(
JSONObject
msgJsonOb
)
{
try
{
String
id
=
msgJsonOb
.
getString
(
"eventId"
);
String
url
=
formatUrl
(
msgJsonOb
.
getString
(
"url"
));
// 判断redishash是否已存在这个值。存在true重复 不存在添加值返回false。
if
(
redisService
.
existsCollectionHashByUrlkey
(
url
,
id
))
{
// 存在
return
true
;
}
else
{
// 不存在
return
false
;
}
}
catch
(
Exception
e
)
{
log
.
error
(
"判断重复urlError:"
,
e
);
return
false
;
}
}
/**
* 格式化url
*
* @Title: formatUrl
* @Description: 格式化url
* @param @param
* string
* @param @return
* 设定文件
* @return String 返回类型
* @throws Exception
*/
private
String
formatUrl
(
String
urlStr
)
throws
Exception
{
URL
url
=
new
URL
(
urlStr
);
if
(
"http"
.
equals
(
url
.
getProtocol
()))
{
urlStr
=
urlStr
.
replaceFirst
(
"http://"
,
"https://"
);
}
return
urlStr
;
}
/**
* 判断是否能进缓存
*
* @Title: CanMsgInsert
* @Description: 判断是否能进缓存
* @param @param
* msgJsonOb
* @param @return
* 设定文件
* @return boolean 返回类型
*/
private
boolean
CanMsgInsert
(
JSONObject
msgJsonOb
)
{
if
(
msgJsonOb
.
containsKey
(
"time"
))
{
if
(
null
==
msgJsonOb
.
getLong
(
"time"
)
||
-
1
==
msgJsonOb
.
getLong
(
"time"
))
{
// log.info("MSG_time_Error:{}", msgJsonOb);
return
false
;
}
}
if
(
msgJsonOb
.
containsKey
(
"source"
))
{
if
(
null
==
msgJsonOb
.
getString
(
"source"
)
||
"百度网页"
.
equals
(
msgJsonOb
.
getString
(
"source"
)))
{
// log.info("MSG_Source_Error:{}", msgJsonOb);
return
false
;
}
}
if
(
msgJsonOb
.
containsKey
(
"title"
))
{
if
(
null
==
msgJsonOb
.
getString
(
"title"
))
{
log
.
info
(
"MSG_title_Error:{}"
,
msgJsonOb
);
return
false
;
}
}
if
(
msgJsonOb
.
containsKey
(
"url"
))
{
if
(
null
==
msgJsonOb
.
getString
(
"url"
))
{
log
.
info
(
"MSG_url_Error_null:{}"
,
msgJsonOb
);
return
false
;
}
else
if
(!
isValidUrl
(
msgJsonOb
.
getString
(
"url"
)))
{
log
.
info
(
"MSG_url_Error_notValid:{}"
,
msgJsonOb
);
return
false
;
}
}
if
(
msgJsonOb
.
containsKey
(
"content"
))
{
if
(
null
==
msgJsonOb
.
getString
(
"content"
))
{
log
.
info
(
"MSG_content_Error:{}"
,
msgJsonOb
);
return
false
;
}
}
return
true
;
}
/**
* 校验url合法性
*
* @param url
* @return
*/
private
boolean
isValidUrl
(
String
url
)
{
if
(
Objects
.
nonNull
(
url
)
&&
url
.
length
()
>
2
)
{
try
{
String
host
=
new
URL
(
url
).
getHost
();
// ip地址放过不进行顶级域名校验
if
(
host
.
matches
(
"[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}\\.[\\d]{1,3}"
))
{
return
true
;
}
else
{
InternetDomainName
.
from
(
host
).
topPrivateDomain
();
return
true
;
}
}
catch
(
Exception
e
)
{
return
false
;
}
}
return
false
;
}
@Override
public
void
saveRecords
(
List
<
ConsumerRecord
>
records
)
{
// List<JSONObject> msgs = new ArrayList<>();
...
...
@@ -111,8 +258,8 @@ public class EventServiceImpl implements EventService {
@Override
public
void
overEvent
(
String
id
)
{
Update
over
=
new
Update
();
over
.
update
(
"status"
,
"采集完毕"
);
primaryMongoTemplate
.
up
ser
t
(
new
Query
(
Criteria
.
where
(
"_id"
).
is
(
id
)),
over
,
Event
.
class
);
over
.
set
(
"status"
,
"采集完毕"
);
primaryMongoTemplate
.
up
dateFirs
t
(
new
Query
(
Criteria
.
where
(
"_id"
).
is
(
id
)),
over
,
Event
.
class
);
redisService
.
addEventAutoMarkList
(
id
);
}
...
...
@@ -124,63 +271,78 @@ public class EventServiceImpl implements EventService {
@Override
public
void
updateEventStatusAutoMark
(
String
id
)
{
Update
over
=
new
Update
();
over
.
update
(
"status"
,
"自动标注"
);
primaryMongoTemplate
.
up
ser
t
(
new
Query
(
Criteria
.
where
(
"_id"
).
is
(
id
)),
over
,
Event
.
class
);
over
.
set
(
"status"
,
"自动标注"
);
primaryMongoTemplate
.
up
dateFirs
t
(
new
Query
(
Criteria
.
where
(
"_id"
).
is
(
id
)),
over
,
Event
.
class
);
}
@Override
public
boolean
autoMark
(
Event
eventCollection
)
throws
JsonParseException
,
JsonMappingException
,
IOException
{
String
id
=
eventCollection
.
getId
();
// 1,清理标注聚合结果集
String
group
=
eventCollection
.
getProject
();
log
.
info
(
"group:{},id:{}"
,
group
,
id
);
client
.
cleanMarkAggreData
(
group
,
id
);
// 从缓存获取事件采集数据
int
count
=
redisService
.
countCollectionData
(
id
);
// (总记录数+每页行数-1)/每页行数
int
totalpage
=
(
int
)
(
count
+
AUTOMARKPAGELIMIT
-
1
)
/
AUTOMARKPAGELIMIT
;
for
(
int
i
=
0
;
i
<
totalpage
;
i
++)
{
int
start
=
i
*
AUTOMARKPAGELIMIT
;
int
end
=
start
+
AUTOMARKPAGELIMIT
-
1
;
List
<
String
>
eventList
=
new
ArrayList
<>();
if
(
i
!=
(
totalpage
-
1
))
{
eventList
=
new
ArrayList
<>(
redisService
.
getCollectionData
(
id
,
start
,
end
));
public
boolean
autoMark
(
Event
eventCollection
)
{
try
{
String
id
=
eventCollection
.
getId
();
// 1,清理标注聚合结果集
String
group
=
eventCollection
.
getProject
();
log
.
info
(
"group:{},id:{}"
,
group
,
id
);
client
.
cleanMarkAggreData
(
group
,
id
);
// 从缓存获取事件采集数据
int
count
=
redisService
.
countCollectionData
(
id
);
if
(
0
==
count
)
{
// 消息数为0
Update
over
=
new
Update
();
over
.
set
(
"status"
,
"暂无数据"
);
primaryMongoTemplate
.
updateFirst
(
new
Query
(
Criteria
.
where
(
"_id"
).
is
(
id
)),
over
,
Event
.
class
);
}
else
{
eventList
=
new
ArrayList
<>(
redisService
.
getCollectionData
(
id
,
start
,
-
1
));
}
List
<
DBObject
>
list
=
new
ArrayList
<>();
for
(
int
j
=
0
;
j
<
eventList
.
size
();
j
++)
{
DBObject
dbObject
=
new
BasicDBObject
();
String
msgStr
=
eventList
.
get
(
j
);
Map
<
String
,
Object
>
map
=
mapper
.
readValue
(
msgStr
,
Map
.
class
);
dbObject
.
put
(
"_id"
,
map
.
get
(
"url"
)
+
""
);
dbObject
.
put
(
"url"
,
map
.
get
(
"url"
)
+
""
);
dbObject
.
put
(
"title"
,
map
.
get
(
"title"
)
+
""
);
dbObject
.
put
(
"source"
,
map
.
get
(
"source"
)
+
""
);
dbObject
.
put
(
"time"
,
new
Date
(
Long
.
parseLong
(
map
.
get
(
"time"
)
+
""
)));
dbObject
.
put
(
"markGroup"
,
group
);
list
.
add
(
dbObject
);
// (总记录数+每页行数-1)/每页行数
int
totalpage
=
(
int
)
(
count
+
AUTOMARKPAGELIMIT
-
1
)
/
AUTOMARKPAGELIMIT
;
for
(
int
i
=
0
;
i
<
totalpage
;
i
++)
{
int
start
=
i
*
AUTOMARKPAGELIMIT
;
int
end
=
start
+
AUTOMARKPAGELIMIT
-
1
;
List
<
String
>
eventList
=
new
ArrayList
<>();
if
(
i
!=
(
totalpage
-
1
))
{
eventList
=
new
ArrayList
<>(
redisService
.
getCollectionData
(
id
,
start
,
end
));
}
else
{
eventList
=
new
ArrayList
<>(
redisService
.
getCollectionData
(
id
,
start
,
-
1
));
}
List
<
DBObject
>
list
=
new
ArrayList
<>();
for
(
int
j
=
0
;
j
<
eventList
.
size
();
j
++)
{
DBObject
dbObject
=
new
BasicDBObject
();
String
msgStr
=
eventList
.
get
(
j
);
Map
<
String
,
Object
>
map
=
mapper
.
readValue
(
msgStr
,
Map
.
class
);
dbObject
.
put
(
"_id"
,
map
.
get
(
"url"
)
+
""
);
dbObject
.
put
(
"url"
,
map
.
get
(
"url"
)
+
""
);
dbObject
.
put
(
"title"
,
map
.
get
(
"title"
)
+
""
);
dbObject
.
put
(
"source"
,
map
.
get
(
"source"
)
+
""
);
dbObject
.
put
(
"time"
,
new
Date
(
Long
.
parseLong
(
map
.
get
(
"time"
)
+
""
)));
dbObject
.
put
(
"offset"
,
map
.
get
(
"offset"
)
+
""
);
dbObject
.
put
(
"markGroup"
,
group
);
list
.
add
(
dbObject
);
}
// 2,增加标注聚合结果集
client
.
addMarkAggreSourceList
(
group
,
id
,
list
);
}
// 3,启动聚合
client
.
startAggre
(
group
,
id
);
}
// 2,增加标注聚合结果集
client
.
addMarkAggreSourceList
(
group
,
id
,
list
);
// 移除等待事件采集聚合list
redisService
.
removeEventAutoMarkList
(
id
);
return
true
;
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
log
.
error
(
"事件进自动标注异常:{}"
,
e
);
return
false
;
}
// 3,启动聚合
client
.
startAggre
(
group
,
id
);
//移除等待事件采集聚合list
redisService
.
removeEventAutoMarkList
(
id
);
return
true
;
}
@Override
public
boolean
updateTopTenCollection
()
{
List
<
String
>
idList
=
redisService
.
getNeedAutoMark
();
List
<
Event
>
events
=
null
!=
findEventsbyIds
(
idList
)?
findEventsbyIds
(
idList
):
new
ArrayList
<>();
List
<
Event
>
events
=
null
!=
findEventsbyIds
(
idList
)
?
findEventsbyIds
(
idList
)
:
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
events
.
size
();
i
++)
{
Event
eventCollection
=
events
.
get
(
i
);
if
(!
eventCollection
.
getStatus
().
equals
(
"采集完毕"
))
{
if
(!
eventCollection
.
getStatus
().
equals
(
"采集完毕"
))
{
Update
over
=
new
Update
();
over
.
update
(
"status"
,
"采集完毕"
);
primaryMongoTemplate
.
upsert
(
new
Query
(
Criteria
.
where
(
"_id"
).
is
(
eventCollection
.
getId
())),
over
,
Event
.
class
);
over
.
set
(
"status"
,
"采集完毕"
);
primaryMongoTemplate
.
updateFirst
(
new
Query
(
Criteria
.
where
(
"_id"
).
is
(
eventCollection
.
getId
())),
over
,
Event
.
class
);
}
}
return
true
;
...
...
src/main/resources/application.properties
View file @
22a3d9e4
...
...
@@ -44,9 +44,6 @@ spring.data.mongodb.thirdary.password=stno1q2w3e4r
spring.data.mongodb.thirdary.authenticationDatabase
=
admin
#kafka
spring.kafka.bootstrap-servers
=
kafka1.irybd.com:9092
spring.kafka.producer.key-serializer
=
org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer
=
org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# group id
#spring.kafka.consumer.group-id=group1
...
...
@@ -59,11 +56,11 @@ spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.S
#spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.servers
=
kafka1.irybd.com:9092
kafka.consumer.servers
=
192.168.0.203:9092,192.168.0.203:9093,192.168.0.203:9094
kafka.consumer.enable.auto.commit
=
false
kafka.consumer.session.timeout
=
15000
kafka.consumer.auto.commit.interval
=
100
kafka.consumer.auto.offset.reset
=
earliest
kafka.consumer.group.id
=
group
kafka.consumer.concurrency
=
1
0
kafka.consumer.group.id
=
stno
kafka.consumer.concurrency
=
1
kafka.consumer.maxPollRecordsConfig
=
100
\ No newline at end of file
src/main/resources/application.properties.local1
View file @
22a3d9e4
##
服务端
uri
##
\u670D\u52A1\u7AEF
uri
#spring.data.mongodb.uri=115.236.59.91:27017
#
服务端数据库
#
\u670D\u52A1\u7AEF\u6570\u636E\u5E93
spring.data.mongodb.primary.database=qbjcPhoenix
#
服务
ip
#
\u670D\u52A1
ip
spring.data.mongodb.primary.host=127.0.0.1
#
服务
port
#
\u670D\u52A1
port
spring.data.mongodb.primary.port=27017
#spring.data.mongodb.primary.username=stno
...
...
@@ -13,11 +13,11 @@ spring.data.mongodb.primary.port=27017
#spring.data.mongodb.primary.authenticationDatabase=admin
#
服务端数据库
#
\u670D\u52A1\u7AEF\u6570\u636E\u5E93
spring.data.mongodb.secondary.database=eventMuseum
#
服务
ip
#
\u670D\u52A1
ip
spring.data.mongodb.secondary.host=202.107.192.94
#
服务
port
#
\u670D\u52A1
port
spring.data.mongodb.secondary.port=30000
spring.data.mongodb.secondary.username=stno
...
...
@@ -26,11 +26,11 @@ spring.data.mongodb.secondary.password=stno1q2w3e4r
spring.data.mongodb.secondary.authenticationDatabase=admin
#
服务端数据库
#
\u670D\u52A1\u7AEF\u6570\u636E\u5E93
spring.data.mongodb.thirdary.database=WechatPublic
#
服务
ip
#
\u670D\u52A1
ip
spring.data.mongodb.thirdary.host=202.107.192.94
#
服务
port
#
\u670D\u52A1
port
spring.data.mongodb.thirdary.port=30000
spring.data.mongodb.thirdary.username=stno
...
...
@@ -40,9 +40,6 @@ spring.data.mongodb.thirdary.password=stno1q2w3e4r
spring.data.mongodb.thirdary.authenticationDatabase=admin
#kafka
spring.kafka.bootstrap-servers = kafka1.irybd.com:9092
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# group id
#spring.kafka.consumer.group-id=group1
...
...
@@ -55,11 +52,12 @@ spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.S
#spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#kafka1.irybd.com:9092,kafka1.irybd.com:9093,kafka1.irybd.com:9094
kafka.consumer.servers=kafka1.irybd.com:9092
kafka.consumer.enable.auto.commit=false
kafka.consumer.session.timeout=15000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=earliest
kafka.consumer.group.id=group
kafka.consumer.concurrency=1
0
kafka.consumer.group.id=group
-test
kafka.consumer.concurrency=1
kafka.consumer.maxPollRecordsConfig=100
\ No newline at end of file
src/main/resources/application.properties.local2
View file @
22a3d9e4
...
...
@@ -59,7 +59,7 @@ kafka.consumer.servers=kafka1.irybd.com:9092
kafka.consumer.enable.auto.commit=false
kafka.consumer.session.timeout=15000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=
earli
est
kafka.consumer.auto.offset.reset=
lat
est
kafka.consumer.group.id=group
kafka.consumer.concurrency=1
0
kafka.consumer.concurrency=1
kafka.consumer.maxPollRecordsConfig=100
\ No newline at end of file
src/main/resources/application.properties.test10001
View file @
22a3d9e4
...
...
@@ -44,9 +44,6 @@ spring.data.mongodb.thirdary.password=stno1q2w3e4r
spring.data.mongodb.thirdary.authenticationDatabase=admin
#kafka
spring.kafka.bootstrap-servers = kafka1.irybd.com:9092
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# group id
#spring.kafka.consumer.group-id=group1
...
...
@@ -59,11 +56,11 @@ spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.S
#spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.consumer.servers=
kafka1.irybd.com:9092
kafka.consumer.servers=
192.168.0.203:9092,192.168.0.203:9093,192.168.0.203:9094
kafka.consumer.enable.auto.commit=false
kafka.consumer.session.timeout=15000
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=earliest
kafka.consumer.group.id=
group
kafka.consumer.concurrency=1
0
kafka.consumer.group.id=
stno
kafka.consumer.concurrency=1
kafka.consumer.maxPollRecordsConfig=100
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment