Commit 2420b385 by shenjunjie

Merge branch 'release' into 'master'

Release

See merge request !435
parents 19c92c0d 8d32c848
...@@ -26,6 +26,8 @@ import java.util.ArrayList; ...@@ -26,6 +26,8 @@ import java.util.ArrayList;
import java.util.Calendar; import java.util.Calendar;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -74,22 +76,28 @@ public class ChannelEsDao extends EsClientDao { ...@@ -74,22 +76,28 @@ public class ChannelEsDao extends EsClientDao {
public void upsertChannelRecord(List<ChannelRecord> channelRecords) { public void upsertChannelRecord(List<ChannelRecord> channelRecords) {
String index = getChannelRecordIndexes().get(0); String index = getChannelRecordIndexes().get(0);
BulkRequest bulkRequest = new BulkRequest(); // 打印统计时间
Long startTime = null; AtomicLong startTime = new AtomicLong(-1);
Long endTime = null; AtomicLong endTime = new AtomicLong(-1);
for (List<ChannelRecord> records : ListUtils.partition(channelRecords, 100)) { for (List<ChannelRecord> records : ListUtils.partition(channelRecords, 100)) {
BulkRequest bulkRequest = new BulkRequest();
for (ChannelRecord record : records) { for (ChannelRecord record : records) {
startTime = null == startTime ? record.getRangeStartTime() : Math.min(startTime, record.getRangeStartTime()); bulkRequest.add(createChannelRecordIndexRequest(record, index, startTime, endTime));
endTime = null == endTime ? record.getRangeEndTime() : Math.max(endTime, record.getRangeEndTime()); }
bulkRequest.add(new IndexRequest(index).id(record.getEsId()).source(record.toEsMap())); BulkResponse bulkResponse;
try {
bulkResponse = retryTemplate.execute(context -> {
try {
return channelEsClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
log.info("upsertRecord批量操作失败,尝试重试第{}次-", context.getRetryCount() + 1, e);
return null;
}
});
} catch (Exception e) {
// 重试三次后失败拆分channelRecords
bulkResponse = upsertChannelRecordLimit(records, index, 10);
} }
BulkResponse bulkResponse = retryTemplate.execute(context -> {
try {
return channelEsClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (Exception ignored) {
return null;
}
});
if (null == bulkResponse) { if (null == bulkResponse) {
log.error("upsertRecord批量操作重试后失败,index:{},rangeTime:{}", index, startTime + "-" + endTime); log.error("upsertRecord批量操作重试后失败,index:{},rangeTime:{}", index, startTime + "-" + endTime);
} else if (bulkResponse.hasFailures()) { } else if (bulkResponse.hasFailures()) {
...@@ -101,6 +109,46 @@ public class ChannelEsDao extends EsClientDao { ...@@ -101,6 +109,46 @@ public class ChannelEsDao extends EsClientDao {
} }
} }
private BulkResponse upsertChannelRecordLimit(List<ChannelRecord> records, String index, int limit) {
AtomicBoolean res = new AtomicBoolean(true);
BulkResponse bulkResponse = null;
// 重试三次后失败拆分channelRecords
for (List<ChannelRecord> minRecords : ListUtils.partition(records, limit)) {
BulkRequest minBulkRequest = new BulkRequest();
for (ChannelRecord minRecord : minRecords) {
minBulkRequest.add(createChannelRecordIndexRequest(minRecord, index, null, null));
}
bulkResponse = retryTemplate.execute(context -> {
try {
return channelEsClient.bulk(minBulkRequest, RequestOptions.DEFAULT);
} catch (Exception ex) {
res.set(false);
log.info("upsertRecord批量操作失败后二次重试,尝试重试第{}次-", context.getRetryCount() + 1, ex);
return null;
}
});
}
return !res.get() ? null : bulkResponse;
}
private IndexRequest createChannelRecordIndexRequest(ChannelRecord record, String index, AtomicLong startTime, AtomicLong endTime) {
if (null != startTime) {
if (-1 == startTime.get()) {
startTime.set(record.getRangeStartTime());
} else {
startTime.set(Math.min(startTime.get(), record.getRangeStartTime()));
}
}
if (null != endTime) {
if (-1 == endTime.get()) {
endTime.set(record.getRangeEndTime());
} else {
endTime.set(Math.max(endTime.get(), record.getRangeEndTime()));
}
}
return new IndexRequest(index).id(record.getEsId()).source(record.toEsMap());
}
public void batchInsert(List<Map<String, Object>> insertList) { public void batchInsert(List<Map<String, Object>> insertList) {
retryTemplate.execute(context -> { retryTemplate.execute(context -> {
try { try {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment