Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
B
brandkbs2
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
shenjunjie
brandkbs2
Commits
56e10078
Commit
56e10078
authored
Jul 15, 2024
by
shenjunjie
Browse files
Options
Browse Files
Download
Plain Diff
Merge branch 'feature' into 'release'
Feature See merge request
!565
parents
5938b9bd
aa8a4748
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
125 additions
and
51 deletions
+125
-51
src/main/java/com/zhiwei/brandkbs2/es/ChannelEsDao.java
+52
-4
src/main/java/com/zhiwei/brandkbs2/es/EsClientDao.java
+51
-2
src/main/java/com/zhiwei/brandkbs2/service/impl/ChannelServiceImpl.java
+12
-9
src/main/java/com/zhiwei/brandkbs2/service/impl/TaskServiceImpl.java
+10
-36
No files found.
src/main/java/com/zhiwei/brandkbs2/es/ChannelEsDao.java
View file @
56e10078
package
com
.
zhiwei
.
brandkbs2
.
es
;
package
com
.
zhiwei
.
brandkbs2
.
es
;
import
com.alibaba.fastjson.JSONObject
;
import
com.zhiwei.brandkbs2.common.GenericAttribute
;
import
com.zhiwei.brandkbs2.common.GenericAttribute
;
import
com.zhiwei.brandkbs2.pojo.ChannelIndex
;
import
com.zhiwei.brandkbs2.pojo.ChannelIndex
;
import
com.zhiwei.brandkbs2.pojo.ChannelRecord
;
import
com.zhiwei.brandkbs2.pojo.ChannelRecord
;
...
@@ -10,23 +11,25 @@ import org.apache.logging.log4j.Logger;
...
@@ -10,23 +11,25 @@ import org.apache.logging.log4j.Logger;
import
org.elasticsearch.action.bulk.BulkRequest
;
import
org.elasticsearch.action.bulk.BulkRequest
;
import
org.elasticsearch.action.bulk.BulkResponse
;
import
org.elasticsearch.action.bulk.BulkResponse
;
import
org.elasticsearch.action.index.IndexRequest
;
import
org.elasticsearch.action.index.IndexRequest
;
import
org.elasticsearch.action.search.SearchRequest
;
import
org.elasticsearch.action.search.SearchResponse
;
import
org.elasticsearch.action.update.UpdateRequest
;
import
org.elasticsearch.client.RequestOptions
;
import
org.elasticsearch.client.RequestOptions
;
import
org.elasticsearch.client.RestHighLevelClient
;
import
org.elasticsearch.client.RestHighLevelClient
;
import
org.elasticsearch.index.query.BoolQueryBuilder
;
import
org.elasticsearch.index.query.BoolQueryBuilder
;
import
org.elasticsearch.index.query.QueryBuilders
;
import
org.elasticsearch.index.query.QueryBuilders
;
import
org.elasticsearch.search.SearchHit
;
import
org.elasticsearch.search.SearchHit
;
import
org.elasticsearch.search.SearchHits
;
import
org.elasticsearch.search.SearchHits
;
import
org.elasticsearch.search.builder.SearchSourceBuilder
;
import
org.elasticsearch.search.sort.SortBuilders
;
import
org.elasticsearch.search.sort.SortBuilders
;
import
org.elasticsearch.search.sort.SortOrder
;
import
org.elasticsearch.search.sort.SortOrder
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.Resource
;
import
javax.annotation.Resource
;
import
java.io.IOException
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.*
;
import
java.util.Calendar
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
java.util.concurrent.atomic.AtomicLong
;
import
java.util.concurrent.atomic.AtomicLong
;
import
java.util.stream.Collectors
;
import
java.util.stream.Collectors
;
...
@@ -111,6 +114,51 @@ public class ChannelEsDao extends EsClientDao {
...
@@ -111,6 +114,51 @@ public class ChannelEsDao extends EsClientDao {
}
}
}
}
public
void
removeChannelRecordById
(
List
<
JSONObject
>
list
)
{
List
<
String
>
idList
=
list
.
stream
().
map
(
json
->
json
.
getString
(
"id"
)).
collect
(
Collectors
.
toList
());
String
index
=
getChannelRecordIndexes
().
get
(
0
);
AtomicInteger
count
=
new
AtomicInteger
(
idList
.
size
());
idList
.
forEach
(
id
->
{
AtomicInteger
searchCount
=
new
AtomicInteger
();
AtomicInteger
updateCount
=
new
AtomicInteger
();
SearchSourceBuilder
searchSourceBuilder
=
new
SearchSourceBuilder
();
searchSourceBuilder
.
query
(
QueryBuilders
.
termQuery
(
"record.articles.id"
,
id
));
searchSourceBuilder
.
size
(
10000
);
SearchResponse
response
=
retryTemplate
.
execute
(
context
->
{
try
{
return
channelEsClient
.
search
(
new
SearchRequest
().
indices
(
index
).
source
(
searchSourceBuilder
),
RequestOptions
.
DEFAULT
);
}
catch
(
IOException
e
)
{
log
.
info
(
"resetChannelRecordById:{},查询失败,尝试重试第{}次-"
,
id
,
context
.
getRetryCount
()
+
1
,
e
);
return
null
;
}
});
if
(
null
!=
response
)
{
searchCount
.
addAndGet
(
response
.
getHits
().
getHits
().
length
);
for
(
SearchHit
hit
:
response
.
getHits
().
getHits
())
{
JSONObject
record
=
new
JSONObject
((
Map
<
String
,
Object
>)
hit
.
getSourceAsMap
().
get
(
"record"
));
Long
lastTime
=
record
.
getLong
(
"last_time"
);
List
<
ChannelIndex
.
Article
>
articles
=
record
.
getJSONArray
(
"articles"
).
toJavaList
(
JSONObject
.
class
).
stream
().
map
(
json
->
{
// 移除id相同的数据
if
(
json
.
getString
(
"id"
).
equals
(
id
))
{
return
null
;
}
return
ChannelIndex
.
Article
.
fromRecordMap
(
json
);
}).
filter
(
Objects:
:
nonNull
).
collect
(
Collectors
.
toList
());
Map
<
String
,
Object
>
updateMap
=
new
HashMap
<>();
updateMap
.
put
(
"record"
,
new
ChannelIndex
.
Record
(
lastTime
,
articles
).
toEsMap
());
updateMap
.
put
(
"article_count"
,
articles
.
size
());
try
{
channelEsClient
.
update
(
new
UpdateRequest
().
index
(
index
).
id
(
hit
.
getId
()).
doc
(
updateMap
),
RequestOptions
.
DEFAULT
);
updateCount
.
getAndIncrement
();
}
catch
(
IOException
e
)
{
log
.
info
(
"resetChannelRecordById:{},更新失败"
,
id
,
e
);
}
}
}
log
.
info
(
"resetChannelRecordById:{},共查询到{}条,更新{}条,剩余id数:{}"
,
id
,
searchCount
.
get
(),
updateCount
.
get
(),
count
.
decrementAndGet
());
});
}
private
BulkResponse
upsertChannelRecordLimit
(
List
<
ChannelRecord
>
records
,
String
index
,
int
limit
)
{
private
BulkResponse
upsertChannelRecordLimit
(
List
<
ChannelRecord
>
records
,
String
index
,
int
limit
)
{
AtomicBoolean
res
=
new
AtomicBoolean
(
true
);
AtomicBoolean
res
=
new
AtomicBoolean
(
true
);
BulkResponse
bulkResponse
=
null
;
BulkResponse
bulkResponse
=
null
;
...
...
src/main/java/com/zhiwei/brandkbs2/es/EsClientDao.java
View file @
56e10078
...
@@ -112,6 +112,36 @@ public class EsClientDao {
...
@@ -112,6 +112,36 @@ public class EsClientDao {
return
res
;
return
res
;
}
}
public
List
<
JSONObject
>
searchRecordEmpty
(
long
startTime
,
long
endTime
,
String
mgroup
)
{
List
<
JSONObject
>
res
=
new
ArrayList
<>();
List
<
Long
[]>
cutTimes
=
Tools
.
cutTimeRange
(
startTime
,
endTime
,
ONE_HOUR
*
24
);
List
<
CompletableFuture
<
List
<
JSONObject
>>>
futures
=
new
ArrayList
<>(
cutTimes
.
size
());
cutTimes
.
forEach
(
times
->
futures
.
add
(
CompletableFuture
.
supplyAsync
(()
->
searchRecordEmptySingle
(
times
[
0
],
times
[
1
],
mgroup
),
executor
)));
CompletableFuture
.
allOf
(
futures
.
toArray
(
new
CompletableFuture
[
0
])).
whenComplete
((
r
,
e
)
->
{
futures
.
forEach
(
f
->
{
res
.
addAll
(
f
.
join
());
});
}).
join
();
return
res
;
}
private
List
<
JSONObject
>
searchRecordEmptySingle
(
long
startTime
,
long
endTime
,
String
mgroup
)
{
List
<
JSONObject
>
res
=
new
ArrayList
<>();
try
{
BoolQueryBuilder
boolQuery
=
QueryBuilders
.
boolQuery
();
// mark_cache_maps字段为空
boolQuery
.
mustNot
(
QueryBuilders
.
existsQuery
(
"mark_cache_maps"
));
boolQuery
.
must
(
QueryBuilders
.
termQuery
(
"mgroup.keyword"
,
mgroup
));
boolQuery
.
must
(
QueryBuilders
.
rangeQuery
(
"time"
).
gte
(
startTime
).
lt
(
endTime
));
List
<
JSONObject
>
results
=
searchScroll
(
boolQuery
,
10000
,
new
String
[]{
"id"
});
res
.
addAll
(
results
);
}
catch
(
IOException
e
)
{
log
.
error
(
"searchRecordEmptySingle-"
,
e
);
}
log
.
info
(
"startTime:{},endTime:{},size:{}"
,
DF
.
format
(
startTime
),
DF
.
format
(
endTime
),
res
.
size
());
return
res
;
}
//
//
// /**
// /**
// * 搜索符合事件数据
// * 搜索符合事件数据
...
@@ -173,8 +203,27 @@ public class EsClientDao {
...
@@ -173,8 +203,27 @@ public class EsClientDao {
private
Pair
<
Long
[],
Map
<
ChannelIndex
,
ChannelIndex
.
Record
>>
searchRecord
(
long
startTime
,
long
endTime
)
{
private
Pair
<
Long
[],
Map
<
ChannelIndex
,
ChannelIndex
.
Record
>>
searchRecord
(
long
startTime
,
long
endTime
)
{
Map
<
ChannelIndex
,
ChannelIndex
.
Record
>
res
=
new
HashMap
<>();
Map
<
ChannelIndex
,
ChannelIndex
.
Record
>
res
=
new
HashMap
<>();
try
{
try
{
List
<
JSONObject
>
results
=
new
ArrayList
<>();
try
{
QueryBuilder
queryBuilder
=
QueryBuilders
.
rangeQuery
(
"mtime"
).
gte
(
startTime
).
lt
(
endTime
);
QueryBuilder
queryBuilder
=
QueryBuilders
.
rangeQuery
(
"mtime"
).
gte
(
startTime
).
lt
(
endTime
);
List
<
JSONObject
>
results
=
searchScroll
(
queryBuilder
,
10000
,
CHANNEL_RECORD_FETCH_SOURCE
);
results
=
searchScroll
(
queryBuilder
,
10000
,
CHANNEL_RECORD_FETCH_SOURCE
);
}
catch
(
Exception
e
){
log
.
error
(
"searchRecord-搜索阶段出错-时间分段重试开始"
,
e
);
// 时间分段查询
long
midTime
=
startTime
+
(
endTime
-
startTime
)
/
2
;
try
{
QueryBuilder
queryBuilder1
=
QueryBuilders
.
rangeQuery
(
"mtime"
).
gte
(
startTime
).
lt
(
midTime
);
results
.
addAll
(
searchScroll
(
queryBuilder1
,
10000
,
CHANNEL_RECORD_FETCH_SOURCE
));
}
catch
(
Exception
e1
){
log
.
error
(
"searchRecord分段查询出错,时间范围:{}-{}"
,
startTime
,
midTime
,
e1
);
}
try
{
QueryBuilder
queryBuilder2
=
QueryBuilders
.
rangeQuery
(
"mtime"
).
gte
(
midTime
).
lt
(
endTime
);
results
.
addAll
(
searchScroll
(
queryBuilder2
,
10000
,
CHANNEL_RECORD_FETCH_SOURCE
));
}
catch
(
Exception
e2
){
log
.
error
(
"searchRecord分段查询出错,时间范围:{}-{}"
,
midTime
,
endTime
,
e2
);
}
}
for
(
Map
<
String
,
Object
>
result
:
results
)
{
for
(
Map
<
String
,
Object
>
result
:
results
)
{
for
(
ChannelIndex
channelIndex
:
ChannelIndex
.
createChannelIndexes
(
result
))
{
for
(
ChannelIndex
channelIndex
:
ChannelIndex
.
createChannelIndexes
(
result
))
{
res
.
compute
(
channelIndex
,
(
k
,
v
)
->
{
res
.
compute
(
channelIndex
,
(
k
,
v
)
->
{
...
@@ -190,7 +239,7 @@ public class EsClientDao {
...
@@ -190,7 +239,7 @@ public class EsClientDao {
});
});
}
}
}
}
}
catch
(
IO
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"searchRecord-"
,
e
);
log
.
error
(
"searchRecord-"
,
e
);
}
}
log
.
info
(
"startTime:{},endTime:{},size:{}"
,
DF
.
format
(
startTime
),
DF
.
format
(
endTime
),
res
.
size
());
log
.
info
(
"startTime:{},endTime:{},size:{}"
,
DF
.
format
(
startTime
),
DF
.
format
(
endTime
),
res
.
size
());
...
...
src/main/java/com/zhiwei/brandkbs2/service/impl/ChannelServiceImpl.java
View file @
56e10078
...
@@ -529,11 +529,14 @@ public class ChannelServiceImpl implements ChannelService {
...
@@ -529,11 +529,14 @@ public class ChannelServiceImpl implements ChannelService {
return
null
;
return
null
;
}
}
articles
.
sort
(
Comparator
.
comparingLong
(
ChannelIndex
.
Article
::
getTime
).
reversed
());
articles
.
sort
(
Comparator
.
comparingLong
(
ChannelIndex
.
Article
::
getTime
).
reversed
());
// 重置符合条件的文章及数目
channelRecord
.
getRecord
().
setArticles
(
articles
);
channelRecord
.
getRecord
().
setArticles
(
articles
);
channelRecord
.
setArticleCount
(
articles
.
size
());
return
channelRecord
;
return
channelRecord
;
}).
filter
(
Objects:
:
nonNull
).
collect
(
Collectors
.
groupingBy
(
ChannelRecord:
:
getPlatform
));
}).
filter
(
Objects:
:
nonNull
).
collect
(
Collectors
.
groupingBy
(
ChannelRecord:
:
getPlatform
));
for
(
String
platformName
:
PLATFORMS
)
{
for
(
String
platformName
:
PLATFORMS
)
{
List
<
ChannelRecord
>
channelRecordList
=
channelRecords
.
getOrDefault
(
platformName
,
Collections
.
emptyList
()).
stream
().
limit
(
size
).
collect
(
Collectors
.
toList
());
List
<
ChannelRecord
>
channelRecordList
=
channelRecords
.
getOrDefault
(
platformName
,
Collections
.
emptyList
()).
stream
().
sorted
((
x
,
y
)
->
Long
.
compare
(
y
.
getArticleCount
(),
x
.
getArticleCount
())).
limit
(
size
).
collect
(
Collectors
.
toList
());
List
<
ChannelListVO
>
list
=
new
ArrayList
<>(
size
);
List
<
ChannelListVO
>
list
=
new
ArrayList
<>(
size
);
Map
<
String
,
ChannelRecord
>
map
=
Maps
.
uniqueIndex
(
channelRecordList
,
ChannelRecord:
:
getChannelFid
);
Map
<
String
,
ChannelRecord
>
map
=
Maps
.
uniqueIndex
(
channelRecordList
,
ChannelRecord:
:
getChannelFid
);
Map
<
String
,
Channel
>
fidChannel
=
channelDao
.
queryUniqueAsync
(
map
.
keySet
());
Map
<
String
,
Channel
>
fidChannel
=
channelDao
.
queryUniqueAsync
(
map
.
keySet
());
...
@@ -545,14 +548,14 @@ public class ChannelServiceImpl implements ChannelService {
...
@@ -545,14 +548,14 @@ public class ChannelServiceImpl implements ChannelService {
list
.
add
(
ChannelListVO
.
createFromChannel
(
record
,
record
.
getRecord
().
getArticles
().
size
()));
list
.
add
(
ChannelListVO
.
createFromChannel
(
record
,
record
.
getRecord
().
getArticles
().
size
()));
}
}
});
});
List
<
ChannelListVO
>
resList
;
//
List<ChannelListVO> resList;
// 排序
//
// 排序
if
(
Objects
.
nonNull
(
sorter
)
&&
sorter
.
contains
(
"index"
)){
//
if (Objects.nonNull(sorter) && sorter.contains("index")){
resList
=
list
.
stream
().
sorted
(
Comparator
.
comparingDouble
(
ChannelListVO:
:
getEmotionIndex
).
reversed
()).
collect
(
Collectors
.
toList
());
//
resList = list.stream().sorted(Comparator.comparingDouble(ChannelListVO::getEmotionIndex).reversed()).collect(Collectors.toList());
}
else
{
//
}else {
resList
=
list
.
stream
().
sorted
(
Comparator
.
comparingDouble
(
ChannelListVO:
:
getArticleCount
).
reversed
()).
collect
(
Collectors
.
toList
());
//
resList = list.stream().sorted(Comparator.comparingDouble(ChannelListVO::getArticleCount).reversed()).collect(Collectors.toList());
}
//
}
res
.
put
(
platformName
,
resL
ist
);
res
.
put
(
platformName
,
l
ist
);
}
}
return
res
;
return
res
;
}
}
...
...
src/main/java/com/zhiwei/brandkbs2/service/impl/TaskServiceImpl.java
View file @
56e10078
...
@@ -274,43 +274,17 @@ public class TaskServiceImpl implements TaskService {
...
@@ -274,43 +274,17 @@ public class TaskServiceImpl implements TaskService {
return
Pair
.
of
(
insertList
,
updateList
);
return
Pair
.
of
(
insertList
,
updateList
);
}
}
/**
* 本地测试-刷新历史渠道记录用
* 不更新渠道表
*/
@Deprecated
@Deprecated
public
void
messageFlowCount2
(
int
day
)
{
public
void
messageFlowCount2
(
long
startTime
,
long
endTime
,
String
mgroup
)
{
List
<
Pair
<
Long
[],
Map
<
ChannelIndex
,
ChannelIndex
.
Record
>>>
rangeTimeRecords
=
esClientDao
.
searchRecordRecentDay
(
day
);
// 找到项目的空标签数据
int
total
=
rangeTimeRecords
.
stream
().
mapToInt
(
pair
->
pair
.
getRight
().
values
().
size
()).
sum
();
List
<
JSONObject
>
list
=
esClientDao
.
searchRecordEmpty
(
startTime
,
endTime
,
mgroup
);
log
.
info
(
"渠道统计-搜索到近{}天的受影响渠道数{}条"
,
day
,
total
);
// 移除对应数据记录
// 结果合并
channelEsDao
.
removeChannelRecordById
(
list
);
List
<
Map
<
ChannelIndex
,
ChannelIndex
.
Record
>>
channelList
=
rangeTimeRecords
.
stream
().
map
(
Pair:
:
getRight
).
collect
(
Collectors
.
toList
());
log
.
info
(
"刷新历史渠道记录-统计结束"
);
// 合并渠道记录
Map
<
ChannelIndex
,
ChannelIndex
.
Record
>
channelIndexRecordMap
=
ChannelIndex
.
mergeRecord
(
channelList
);
// 获得单位时间内最小最大时间戳
Long
[]
timeMinMax
=
Tools
.
timeMinMax
(
rangeTimeRecords
.
stream
().
map
(
Pair:
:
getLeft
).
collect
(
Collectors
.
toList
()));
List
<
ChannelRecord
>
channelRecords
=
ChannelRecord
.
createChannelRecords
(
timeMinMax
[
0
],
timeMinMax
[
1
],
channelIndexRecordMap
);
channelEsDao
.
upsertChannelRecord
(
channelRecords
);
log
.
info
(
"渠道统计-渠道记录-统计结束"
);
// List<ChannelRecord> channelRecords = rangeTimeRecords.stream().map(pair -> ChannelRecord.createChannelRecords(pair.getLeft()[0], pair.getLeft()[1], pair.getRight())).flatMap(Collection::stream).collect(Collectors.toList());
long
handleSize
=
0
;
List
<
Channel
>
insertList
=
new
ArrayList
<>();
for
(
Map
.
Entry
<
ChannelIndex
,
ChannelIndex
.
Record
>
entry
:
channelIndexRecordMap
.
entrySet
())
{
// 是否已存在
Channel
channel
=
channelDao
.
queryUnique
(
entry
.
getKey
());
if
(
null
==
channel
)
{
channel
=
Channel
.
createFromChannelIndexRecord
(
entry
.
getKey
(),
entry
.
getValue
());
insertList
.
add
(
channel
);
}
else
{
channel
.
setRecord
(
entry
.
getValue
());
channelDao
.
updateOne
(
channel
);
}
// 设置查询数值
entry
.
getKey
().
setChannelInfo
(
channel
);
if
(++
handleSize
%
10000
==
0
)
{
log
.
info
(
"渠道统计-渠道总计-查询更新已完成{}/{}"
,
handleSize
,
channelIndexRecordMap
.
size
());
}
}
log
.
info
(
"渠道统计-渠道总计-查询更新结束,开始批量入库"
);
ListUtils
.
partition
(
insertList
,
1000
).
forEach
(
list
->
channelDao
.
insertMany
(
list
));
log
.
info
(
"渠道统计-渠道总计-录入完毕,新增渠道{}条,更新渠道{}条"
,
insertList
.
size
(),
total
-
insertList
.
size
());
}
}
@Override
@Override
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment