Commit ed6da81b by shenjunjie

修复非channelRecord批量操作bug

parent 2ad027ed
......@@ -26,6 +26,8 @@ import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
......@@ -74,22 +76,28 @@ public class ChannelEsDao extends EsClientDao {
public void upsertChannelRecord(List<ChannelRecord> channelRecords) {
String index = getChannelRecordIndexes().get(0);
BulkRequest bulkRequest = new BulkRequest();
Long startTime = null;
Long endTime = null;
// 打印统计时间
AtomicLong startTime = new AtomicLong(-1);
AtomicLong endTime = new AtomicLong(-1);
for (List<ChannelRecord> records : ListUtils.partition(channelRecords, 100)) {
BulkRequest bulkRequest = new BulkRequest();
for (ChannelRecord record : records) {
startTime = null == startTime ? record.getRangeStartTime() : Math.min(startTime, record.getRangeStartTime());
endTime = null == endTime ? record.getRangeEndTime() : Math.max(endTime, record.getRangeEndTime());
bulkRequest.add(new IndexRequest(index).id(record.getEsId()).source(record.toEsMap()));
bulkRequest.add(createChannelRecordIndexRequest(record, index, startTime, endTime));
}
BulkResponse bulkResponse = retryTemplate.execute(context -> {
BulkResponse bulkResponse;
try {
bulkResponse = retryTemplate.execute(context -> {
try {
return channelEsClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (Exception ignored) {
} catch (Exception e) {
log.info("upsertRecord批量操作失败,尝试重试第{}次-", context.getRetryCount() + 1, e);
return null;
}
});
} catch (Exception e) {
// 重试三次后失败拆分channelRecords
bulkResponse = upsertChannelRecordLimit(records, index, 10);
}
if (null == bulkResponse) {
log.error("upsertRecord批量操作重试后失败,index:{},rangeTime:{}", index, startTime + "-" + endTime);
} else if (bulkResponse.hasFailures()) {
......@@ -101,6 +109,46 @@ public class ChannelEsDao extends EsClientDao {
}
}
private BulkResponse upsertChannelRecordLimit(List<ChannelRecord> records, String index, int limit) {
AtomicBoolean res = new AtomicBoolean(true);
BulkResponse bulkResponse = null;
// 重试三次后失败拆分channelRecords
for (List<ChannelRecord> minRecords : ListUtils.partition(records, limit)) {
BulkRequest minBulkRequest = new BulkRequest();
for (ChannelRecord minRecord : minRecords) {
minBulkRequest.add(createChannelRecordIndexRequest(minRecord, index, null, null));
}
bulkResponse = retryTemplate.execute(context -> {
try {
return channelEsClient.bulk(minBulkRequest, RequestOptions.DEFAULT);
} catch (Exception ex) {
res.set(false);
log.info("upsertRecord批量操作失败后二次重试,尝试重试第{}次-", context.getRetryCount() + 1, ex);
return null;
}
});
}
return !res.get() ? null : bulkResponse;
}
private IndexRequest createChannelRecordIndexRequest(ChannelRecord record, String index, AtomicLong startTime, AtomicLong endTime) {
if (null != startTime) {
if (-1 == startTime.get()) {
startTime.set(record.getRangeStartTime());
} else {
startTime.set(Math.min(startTime.get(), record.getRangeStartTime()));
}
}
if (null != endTime) {
if (-1 == endTime.get()) {
endTime.set(record.getRangeEndTime());
} else {
endTime.set(Math.max(endTime.get(), record.getRangeEndTime()));
}
}
return new IndexRequest(index).id(record.getEsId()).source(record.toEsMap());
}
public void batchInsert(List<Map<String, Object>> insertList) {
retryTemplate.execute(context -> {
try {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment