Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
B
brandkbs2
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
shenjunjie
brandkbs2
Commits
aa8a4748
Commit
aa8a4748
authored
Jul 15, 2024
by
shenjunjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
渠道榜单排序异常解决
parent
c15766a5
Show whitespace changes
Inline
Side-by-side
Showing
4 changed files
with
104 additions
and
49 deletions
+104
-49
src/main/java/com/zhiwei/brandkbs2/es/ChannelEsDao.java
+52
-4
src/main/java/com/zhiwei/brandkbs2/es/EsClientDao.java
+30
-0
src/main/java/com/zhiwei/brandkbs2/service/impl/ChannelServiceImpl.java
+12
-9
src/main/java/com/zhiwei/brandkbs2/service/impl/TaskServiceImpl.java
+10
-36
No files found.
src/main/java/com/zhiwei/brandkbs2/es/ChannelEsDao.java
View file @
aa8a4748
package
com
.
zhiwei
.
brandkbs2
.
es
;
package
com
.
zhiwei
.
brandkbs2
.
es
;
import
com.alibaba.fastjson.JSONObject
;
import
com.zhiwei.brandkbs2.common.GenericAttribute
;
import
com.zhiwei.brandkbs2.common.GenericAttribute
;
import
com.zhiwei.brandkbs2.pojo.ChannelIndex
;
import
com.zhiwei.brandkbs2.pojo.ChannelIndex
;
import
com.zhiwei.brandkbs2.pojo.ChannelRecord
;
import
com.zhiwei.brandkbs2.pojo.ChannelRecord
;
...
@@ -10,23 +11,25 @@ import org.apache.logging.log4j.Logger;
...
@@ -10,23 +11,25 @@ import org.apache.logging.log4j.Logger;
import
org.elasticsearch.action.bulk.BulkRequest
;
import
org.elasticsearch.action.bulk.BulkRequest
;
import
org.elasticsearch.action.bulk.BulkResponse
;
import
org.elasticsearch.action.bulk.BulkResponse
;
import
org.elasticsearch.action.index.IndexRequest
;
import
org.elasticsearch.action.index.IndexRequest
;
import
org.elasticsearch.action.search.SearchRequest
;
import
org.elasticsearch.action.search.SearchResponse
;
import
org.elasticsearch.action.update.UpdateRequest
;
import
org.elasticsearch.client.RequestOptions
;
import
org.elasticsearch.client.RequestOptions
;
import
org.elasticsearch.client.RestHighLevelClient
;
import
org.elasticsearch.client.RestHighLevelClient
;
import
org.elasticsearch.index.query.BoolQueryBuilder
;
import
org.elasticsearch.index.query.BoolQueryBuilder
;
import
org.elasticsearch.index.query.QueryBuilders
;
import
org.elasticsearch.index.query.QueryBuilders
;
import
org.elasticsearch.search.SearchHit
;
import
org.elasticsearch.search.SearchHit
;
import
org.elasticsearch.search.SearchHits
;
import
org.elasticsearch.search.SearchHits
;
import
org.elasticsearch.search.builder.SearchSourceBuilder
;
import
org.elasticsearch.search.sort.SortBuilders
;
import
org.elasticsearch.search.sort.SortBuilders
;
import
org.elasticsearch.search.sort.SortOrder
;
import
org.elasticsearch.search.sort.SortOrder
;
import
org.springframework.stereotype.Component
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.Resource
;
import
javax.annotation.Resource
;
import
java.io.IOException
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.*
;
import
java.util.Calendar
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
java.util.concurrent.atomic.AtomicLong
;
import
java.util.concurrent.atomic.AtomicLong
;
import
java.util.stream.Collectors
;
import
java.util.stream.Collectors
;
...
@@ -111,6 +114,51 @@ public class ChannelEsDao extends EsClientDao {
...
@@ -111,6 +114,51 @@ public class ChannelEsDao extends EsClientDao {
}
}
}
}
public
void
removeChannelRecordById
(
List
<
JSONObject
>
list
)
{
List
<
String
>
idList
=
list
.
stream
().
map
(
json
->
json
.
getString
(
"id"
)).
collect
(
Collectors
.
toList
());
String
index
=
getChannelRecordIndexes
().
get
(
0
);
AtomicInteger
count
=
new
AtomicInteger
(
idList
.
size
());
idList
.
forEach
(
id
->
{
AtomicInteger
searchCount
=
new
AtomicInteger
();
AtomicInteger
updateCount
=
new
AtomicInteger
();
SearchSourceBuilder
searchSourceBuilder
=
new
SearchSourceBuilder
();
searchSourceBuilder
.
query
(
QueryBuilders
.
termQuery
(
"record.articles.id"
,
id
));
searchSourceBuilder
.
size
(
10000
);
SearchResponse
response
=
retryTemplate
.
execute
(
context
->
{
try
{
return
channelEsClient
.
search
(
new
SearchRequest
().
indices
(
index
).
source
(
searchSourceBuilder
),
RequestOptions
.
DEFAULT
);
}
catch
(
IOException
e
)
{
log
.
info
(
"resetChannelRecordById:{},查询失败,尝试重试第{}次-"
,
id
,
context
.
getRetryCount
()
+
1
,
e
);
return
null
;
}
});
if
(
null
!=
response
)
{
searchCount
.
addAndGet
(
response
.
getHits
().
getHits
().
length
);
for
(
SearchHit
hit
:
response
.
getHits
().
getHits
())
{
JSONObject
record
=
new
JSONObject
((
Map
<
String
,
Object
>)
hit
.
getSourceAsMap
().
get
(
"record"
));
Long
lastTime
=
record
.
getLong
(
"last_time"
);
List
<
ChannelIndex
.
Article
>
articles
=
record
.
getJSONArray
(
"articles"
).
toJavaList
(
JSONObject
.
class
).
stream
().
map
(
json
->
{
// 移除id相同的数据
if
(
json
.
getString
(
"id"
).
equals
(
id
))
{
return
null
;
}
return
ChannelIndex
.
Article
.
fromRecordMap
(
json
);
}).
filter
(
Objects:
:
nonNull
).
collect
(
Collectors
.
toList
());
Map
<
String
,
Object
>
updateMap
=
new
HashMap
<>();
updateMap
.
put
(
"record"
,
new
ChannelIndex
.
Record
(
lastTime
,
articles
).
toEsMap
());
updateMap
.
put
(
"article_count"
,
articles
.
size
());
try
{
channelEsClient
.
update
(
new
UpdateRequest
().
index
(
index
).
id
(
hit
.
getId
()).
doc
(
updateMap
),
RequestOptions
.
DEFAULT
);
updateCount
.
getAndIncrement
();
}
catch
(
IOException
e
)
{
log
.
info
(
"resetChannelRecordById:{},更新失败"
,
id
,
e
);
}
}
}
log
.
info
(
"resetChannelRecordById:{},共查询到{}条,更新{}条,剩余id数:{}"
,
id
,
searchCount
.
get
(),
updateCount
.
get
(),
count
.
decrementAndGet
());
});
}
private
BulkResponse
upsertChannelRecordLimit
(
List
<
ChannelRecord
>
records
,
String
index
,
int
limit
)
{
private
BulkResponse
upsertChannelRecordLimit
(
List
<
ChannelRecord
>
records
,
String
index
,
int
limit
)
{
AtomicBoolean
res
=
new
AtomicBoolean
(
true
);
AtomicBoolean
res
=
new
AtomicBoolean
(
true
);
BulkResponse
bulkResponse
=
null
;
BulkResponse
bulkResponse
=
null
;
...
...
src/main/java/com/zhiwei/brandkbs2/es/EsClientDao.java
View file @
aa8a4748
...
@@ -112,6 +112,36 @@ public class EsClientDao {
...
@@ -112,6 +112,36 @@ public class EsClientDao {
return
res
;
return
res
;
}
}
public
List
<
JSONObject
>
searchRecordEmpty
(
long
startTime
,
long
endTime
,
String
mgroup
)
{
List
<
JSONObject
>
res
=
new
ArrayList
<>();
List
<
Long
[]>
cutTimes
=
Tools
.
cutTimeRange
(
startTime
,
endTime
,
ONE_HOUR
*
24
);
List
<
CompletableFuture
<
List
<
JSONObject
>>>
futures
=
new
ArrayList
<>(
cutTimes
.
size
());
cutTimes
.
forEach
(
times
->
futures
.
add
(
CompletableFuture
.
supplyAsync
(()
->
searchRecordEmptySingle
(
times
[
0
],
times
[
1
],
mgroup
),
executor
)));
CompletableFuture
.
allOf
(
futures
.
toArray
(
new
CompletableFuture
[
0
])).
whenComplete
((
r
,
e
)
->
{
futures
.
forEach
(
f
->
{
res
.
addAll
(
f
.
join
());
});
}).
join
();
return
res
;
}
private
List
<
JSONObject
>
searchRecordEmptySingle
(
long
startTime
,
long
endTime
,
String
mgroup
)
{
List
<
JSONObject
>
res
=
new
ArrayList
<>();
try
{
BoolQueryBuilder
boolQuery
=
QueryBuilders
.
boolQuery
();
// mark_cache_maps字段为空
boolQuery
.
mustNot
(
QueryBuilders
.
existsQuery
(
"mark_cache_maps"
));
boolQuery
.
must
(
QueryBuilders
.
termQuery
(
"mgroup.keyword"
,
mgroup
));
boolQuery
.
must
(
QueryBuilders
.
rangeQuery
(
"time"
).
gte
(
startTime
).
lt
(
endTime
));
List
<
JSONObject
>
results
=
searchScroll
(
boolQuery
,
10000
,
new
String
[]{
"id"
});
res
.
addAll
(
results
);
}
catch
(
IOException
e
)
{
log
.
error
(
"searchRecordEmptySingle-"
,
e
);
}
log
.
info
(
"startTime:{},endTime:{},size:{}"
,
DF
.
format
(
startTime
),
DF
.
format
(
endTime
),
res
.
size
());
return
res
;
}
//
//
// /**
// /**
// * 搜索符合事件数据
// * 搜索符合事件数据
...
...
src/main/java/com/zhiwei/brandkbs2/service/impl/ChannelServiceImpl.java
View file @
aa8a4748
...
@@ -529,11 +529,14 @@ public class ChannelServiceImpl implements ChannelService {
...
@@ -529,11 +529,14 @@ public class ChannelServiceImpl implements ChannelService {
return
null
;
return
null
;
}
}
articles
.
sort
(
Comparator
.
comparingLong
(
ChannelIndex
.
Article
::
getTime
).
reversed
());
articles
.
sort
(
Comparator
.
comparingLong
(
ChannelIndex
.
Article
::
getTime
).
reversed
());
// 重置符合条件的文章及数目
channelRecord
.
getRecord
().
setArticles
(
articles
);
channelRecord
.
getRecord
().
setArticles
(
articles
);
channelRecord
.
setArticleCount
(
articles
.
size
());
return
channelRecord
;
return
channelRecord
;
}).
filter
(
Objects:
:
nonNull
).
collect
(
Collectors
.
groupingBy
(
ChannelRecord:
:
getPlatform
));
}).
filter
(
Objects:
:
nonNull
).
collect
(
Collectors
.
groupingBy
(
ChannelRecord:
:
getPlatform
));
for
(
String
platformName
:
PLATFORMS
)
{
for
(
String
platformName
:
PLATFORMS
)
{
List
<
ChannelRecord
>
channelRecordList
=
channelRecords
.
getOrDefault
(
platformName
,
Collections
.
emptyList
()).
stream
().
limit
(
size
).
collect
(
Collectors
.
toList
());
List
<
ChannelRecord
>
channelRecordList
=
channelRecords
.
getOrDefault
(
platformName
,
Collections
.
emptyList
()).
stream
().
sorted
((
x
,
y
)
->
Long
.
compare
(
y
.
getArticleCount
(),
x
.
getArticleCount
())).
limit
(
size
).
collect
(
Collectors
.
toList
());
List
<
ChannelListVO
>
list
=
new
ArrayList
<>(
size
);
List
<
ChannelListVO
>
list
=
new
ArrayList
<>(
size
);
Map
<
String
,
ChannelRecord
>
map
=
Maps
.
uniqueIndex
(
channelRecordList
,
ChannelRecord:
:
getChannelFid
);
Map
<
String
,
ChannelRecord
>
map
=
Maps
.
uniqueIndex
(
channelRecordList
,
ChannelRecord:
:
getChannelFid
);
Map
<
String
,
Channel
>
fidChannel
=
channelDao
.
queryUniqueAsync
(
map
.
keySet
());
Map
<
String
,
Channel
>
fidChannel
=
channelDao
.
queryUniqueAsync
(
map
.
keySet
());
...
@@ -545,14 +548,14 @@ public class ChannelServiceImpl implements ChannelService {
...
@@ -545,14 +548,14 @@ public class ChannelServiceImpl implements ChannelService {
list
.
add
(
ChannelListVO
.
createFromChannel
(
record
,
record
.
getRecord
().
getArticles
().
size
()));
list
.
add
(
ChannelListVO
.
createFromChannel
(
record
,
record
.
getRecord
().
getArticles
().
size
()));
}
}
});
});
List
<
ChannelListVO
>
resList
;
//
List<ChannelListVO> resList;
// 排序
//
// 排序
if
(
Objects
.
nonNull
(
sorter
)
&&
sorter
.
contains
(
"index"
)){
//
if (Objects.nonNull(sorter) && sorter.contains("index")){
resList
=
list
.
stream
().
sorted
(
Comparator
.
comparingDouble
(
ChannelListVO:
:
getEmotionIndex
).
reversed
()).
collect
(
Collectors
.
toList
());
//
resList = list.stream().sorted(Comparator.comparingDouble(ChannelListVO::getEmotionIndex).reversed()).collect(Collectors.toList());
}
else
{
//
}else {
resList
=
list
.
stream
().
sorted
(
Comparator
.
comparingDouble
(
ChannelListVO:
:
getArticleCount
).
reversed
()).
collect
(
Collectors
.
toList
());
//
resList = list.stream().sorted(Comparator.comparingDouble(ChannelListVO::getArticleCount).reversed()).collect(Collectors.toList());
}
//
}
res
.
put
(
platformName
,
resL
ist
);
res
.
put
(
platformName
,
l
ist
);
}
}
return
res
;
return
res
;
}
}
...
...
src/main/java/com/zhiwei/brandkbs2/service/impl/TaskServiceImpl.java
View file @
aa8a4748
...
@@ -274,43 +274,17 @@ public class TaskServiceImpl implements TaskService {
...
@@ -274,43 +274,17 @@ public class TaskServiceImpl implements TaskService {
return
Pair
.
of
(
insertList
,
updateList
);
return
Pair
.
of
(
insertList
,
updateList
);
}
}
/**
* 本地测试-刷新历史渠道记录用
* 不更新渠道表
*/
@Deprecated
@Deprecated
public
void
messageFlowCount2
(
int
day
)
{
public
void
messageFlowCount2
(
long
startTime
,
long
endTime
,
String
mgroup
)
{
List
<
Pair
<
Long
[],
Map
<
ChannelIndex
,
ChannelIndex
.
Record
>>>
rangeTimeRecords
=
esClientDao
.
searchRecordRecentDay
(
day
);
// 找到项目的空标签数据
int
total
=
rangeTimeRecords
.
stream
().
mapToInt
(
pair
->
pair
.
getRight
().
values
().
size
()).
sum
();
List
<
JSONObject
>
list
=
esClientDao
.
searchRecordEmpty
(
startTime
,
endTime
,
mgroup
);
log
.
info
(
"渠道统计-搜索到近{}天的受影响渠道数{}条"
,
day
,
total
);
// 移除对应数据记录
// 结果合并
channelEsDao
.
removeChannelRecordById
(
list
);
List
<
Map
<
ChannelIndex
,
ChannelIndex
.
Record
>>
channelList
=
rangeTimeRecords
.
stream
().
map
(
Pair:
:
getRight
).
collect
(
Collectors
.
toList
());
log
.
info
(
"刷新历史渠道记录-统计结束"
);
// 合并渠道记录
Map
<
ChannelIndex
,
ChannelIndex
.
Record
>
channelIndexRecordMap
=
ChannelIndex
.
mergeRecord
(
channelList
);
// 获得单位时间内最小最大时间戳
Long
[]
timeMinMax
=
Tools
.
timeMinMax
(
rangeTimeRecords
.
stream
().
map
(
Pair:
:
getLeft
).
collect
(
Collectors
.
toList
()));
List
<
ChannelRecord
>
channelRecords
=
ChannelRecord
.
createChannelRecords
(
timeMinMax
[
0
],
timeMinMax
[
1
],
channelIndexRecordMap
);
channelEsDao
.
upsertChannelRecord
(
channelRecords
);
log
.
info
(
"渠道统计-渠道记录-统计结束"
);
// List<ChannelRecord> channelRecords = rangeTimeRecords.stream().map(pair -> ChannelRecord.createChannelRecords(pair.getLeft()[0], pair.getLeft()[1], pair.getRight())).flatMap(Collection::stream).collect(Collectors.toList());
long
handleSize
=
0
;
List
<
Channel
>
insertList
=
new
ArrayList
<>();
for
(
Map
.
Entry
<
ChannelIndex
,
ChannelIndex
.
Record
>
entry
:
channelIndexRecordMap
.
entrySet
())
{
// 是否已存在
Channel
channel
=
channelDao
.
queryUnique
(
entry
.
getKey
());
if
(
null
==
channel
)
{
channel
=
Channel
.
createFromChannelIndexRecord
(
entry
.
getKey
(),
entry
.
getValue
());
insertList
.
add
(
channel
);
}
else
{
channel
.
setRecord
(
entry
.
getValue
());
channelDao
.
updateOne
(
channel
);
}
// 设置查询数值
entry
.
getKey
().
setChannelInfo
(
channel
);
if
(++
handleSize
%
10000
==
0
)
{
log
.
info
(
"渠道统计-渠道总计-查询更新已完成{}/{}"
,
handleSize
,
channelIndexRecordMap
.
size
());
}
}
log
.
info
(
"渠道统计-渠道总计-查询更新结束,开始批量入库"
);
ListUtils
.
partition
(
insertList
,
1000
).
forEach
(
list
->
channelDao
.
insertMany
(
list
));
log
.
info
(
"渠道统计-渠道总计-录入完毕,新增渠道{}条,更新渠道{}条"
,
insertList
.
size
(),
total
-
insertList
.
size
());
}
}
@Override
@Override
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment