现在有一段Java代码,逻辑是将一批数据通过多线程的方式将1000000万条数据新增/更新到表里。 spring boot+mybatis,每次处理10000条数据。执行的sql如下:
<insert id="insertBatch" >
insert into job_info
(job_group,job_desc,
add_time,update_time,author,
alarm_email,schedule_type,schedule_conf,
misfire_strategy,executor_route_strategy,executor_handler,
executor_param,executor_block_strategy,executor_timeout,
executor_fail_retry_count,glue_type,glue_source,
glue_remark,glue_updatetime,child_jobid,
trigger_status,trigger_last_time,trigger_next_time)
values
<foreach collection="list" item="item">
(
#{item.jobGroup},#{item.jobDesc},
#{item.addTime},#{item.updateTime},#{item.author},
#{item.alarmEmail},#{item.scheduleType},#{item.scheduleConf},
#{item.misfireStrategy},#{item.executorRouteStrategy},#{item.executorHandler},
#{item.executorParam},#{item.executorBlockStrategy},#{item.executorTimeout},
#{item.executorFailRetryCount},#{item.glueType},#{item.glueSource},
#{item.glueRemark},#{item.glueUpdatetime},#{item.childJobid},
#{item.triggerStatus},#{item.triggerLastTime},#{item.triggerNextTime}
)
</foreach>
ON DUPLICATE key update
job_group = values(job_group),
job_desc = values(job_desc),
add_time = values(add_time),
update_time = values(update_time),
author = values(author),
alarm_email = values(alarm_email),
schedule_type = values(schedule_type),
schedule_conf = values(schedule_conf),
misfire_strategy = values(misfire_strategy),
executor_route_strategy = values(executor_route_strategy),
executor_handler = values(executor_handler),
executor_param = values(executor_param),
executor_block_strategy = values(executor_block_strategy),
executor_timeout = values(executor_timeout),
executor_fail_retry_count = values(executor_fail_retry_count),
glue_type = values(glue_type),
glue_source = values(glue_source),
glue_remark = values(glue_remark),
glue_updatetime = values(glue_updatetime),
child_jobid = values(child_jobid),
trigger_status = values(trigger_status),
trigger_last_time = values(trigger_last_time),
trigger_next_time = values(trigger_next_time)
</insert>
现在由于1000000完条数据中存在相同的数据,导致多线程批量处理数据时发生了死锁。
分析原因:
问题分析:多线程批量插入更新数据时出现死锁。
死锁通常发生在多个事务竞争资源时,互相等待对方释放锁,形成循环等待。
在MySQL中,使用
ON DUPLICATE KEY UPDATE
进行批量插入更新时,可能会引发死锁。原因如下:
索引与锁竞争:
ON DUPLICATE KEY UPDATE
在执行时,会先尝试插入,如果发现唯一键冲突,则转为更新。在插入时,会对记录加排他锁(X锁)。在批量处理中,如果多条记录在同一个事务中,且这些记录在存储引擎中的位置相邻,那么它们可能会加锁相邻的记录,而加锁的顺序与记录在表中的物理顺序或索引顺序有关。多线程事务:多个线程同时执行批量操作,每个线程处理一批数据(10000条)。不同批次的数据中可能存在相同的记录(即存在相同唯一键的记录),导致多个事务试图更新同一条记录。
锁的获取顺序不一致:不同的事务中,由于数据顺序的差异,加锁的顺序可能不同。例如:
- 事务1先锁记录A,再尝试锁记录B
- 事务2先锁记录B,再尝试锁记录A
这样就会形成死锁。
- 批量大小:每次处理10000条,这个批量较大,增加了在一个事务中涉及多条记录的可能性,从而增加了死锁的概率。
具体到当前SQL,死锁可能发生在以下情况:
- 表
job_info
有唯一索引(可能是主键或唯一键),当批量插入的数据中存在重复的唯一键时,就会触发更新。- 多个线程同时处理的数据中,包含有相同唯一键的记录,且这些记录在批量数据中的位置不同,导致加锁顺序不一致。
另外,MySQL在处理
ON DUPLICATE KEY UPDATE
时,会对重复的行加排他锁(X锁),而由于是批量操作,事务中会涉及多行,因此可能会持有多个锁。当多个事务以不同的顺序请求这些锁时,就会形成循环等待。如何避免死锁?
减小批量大小:将每次处理的批量数据量减小(如从10000条减少到500条),这样可以减少每个事务持有的锁数量,从而降低死锁概率。但注意,批量太小会影响效率。
保证加锁顺序一致:在多个线程中,对要处理的数据按照相同的规则排序(例如按照唯一键排序),确保每个事务加锁的顺序是一致的,从而避免循环等待。
使用单线程:如果数据量允许,可以考虑单线程处理,避免并发带来的死锁问题。但这样会降低处理速度。
重试机制:在捕获到死锁异常(MySQL死锁异常代码为1213)时进行重试。
减少事务时间:尽快提交事务,减少锁的持有时间。
分离插入和更新:先查询哪些记录已存在,然后分别进行批量插入和批量更新。但这样会增加一次查询,并且需要应用层处理,可能会更复杂。
针对当前场景,建议:
- 对每批要处理的数据,按照唯一键(比如主键)进行排序,保证每个线程处理数据时加锁顺序一致。
- 减小批量大小(例如1000条),并进行测试,找到合适的批量大小。
- 在代码中加入死锁重试机制。
例如,在代码中可以这样实现重试:
```java
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
try {
// 执行批量插入/更新
batchInsertOrUpdate(dataBatch);
break; // 成功则跳出循环
} catch (DeadlockLoserDataAccessException e) { // Spring封装了死锁异常
retryCount++;
if (retryCount >= maxRetries) {
throw e;
}
// 等待一段时间后重试
Thread.sleep((long) (Math.random() * 100)); // 随机等待避免同时重试
}
}
```
另外,检查数据库的隔离级别,建议使用READ COMMITTED,因为REPEATABLE READ在InnoDB中更容易出现死锁。
总结:死锁的原因是多个事务以不同的顺序请求锁,解决方案是保证加锁顺序一致、减小事务粒度(批量大小)、加入重试机制。
ON DUPLICATE KEY UPDATE
在遇到重复键时会转换为更新操作// 在分发给线程前进行去重
public List<JobInfo> preprocessData(List<JobInfo> allData) {
Map<String, JobInfo> uniqueMap = new ConcurrentHashMap<>();
allData.parallelStream().forEach(item -> {
String key = item.getJobGroup() + "_" + item.getJobDesc(); // 唯一键组合
uniqueMap.merge(key, item, (oldVal, newVal) -> {
// 保留最新数据
return newVal.getUpdateTime().after(oldVal.getUpdateTime()) ? newVal : oldVal;
});
});
return new ArrayList<>(uniqueMap.values());
}
// 分批处理时按唯一键排序
public void processInBatches(List<JobInfo> data) {
// 按唯一键排序(确保所有线程相同加锁顺序)
data.sort(Comparator.comparing(JobInfo::getJobGroup)
.thenComparing(JobInfo::getJobDesc));
int batchSize = 1000; // 减小批量大小
List<List<JobInfo>> batches = ListUtils.partition(data, batchSize);
batches.parallelStream().forEach(batch -> {
try {
jobMapper.insertBatch(batch);
} catch (DeadlockLoserDataAccessException e) {
// 死锁重试逻辑
retryInsert(batch, 3);
}
});
}
// 死锁重试机制
private void retryInsert(List<JobInfo> batch, int maxRetries) {
int attempt = 0;
while (attempt < maxRetries) {
try {
Thread.sleep((long) (Math.random() * 100)); // 随机等待
jobMapper.insertBatch(batch);
return;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} catch (DeadlockLoserDataAccessException e) {
attempt++;
}
}
// 记录失败批次
log.error("Failed to process batch after {} retries", maxRetries);
}
<insert id="insertBatch">
<foreach collection="list" item="item" separator=";">
INSERT INTO job_info
(job_group, job_desc, ...)
VALUES
(#{item.jobGroup}, #{item.jobDesc}, ...)
ON DUPLICATE KEY UPDATE
job_group = VALUES(job_group),
...
</foreach>
</insert>
优化点:
allowMultiQueries=true
spring:
datasource:
hikari:
maximum-pool-size: 20 # 控制并发连接数
tomcat:
max-wait: 10000 # 最大等待时间
-- 表结构优化
ALTER TABLE job_info
MODIFY COLUMN glue_source TEXT,
ENGINE=InnoDB
ROW_FORMAT=COMPRESSED
KEY_BLOCK_SIZE=8;
批量大小调整:
线程池优化:
@Configuration
public class ThreadPoolConfig {
@Bean("batchTaskExecutor")
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("batch-");
executor.initialize();
return executor;
}
}
索引优化:
-- 确保唯一索引存在
ALTER TABLE job_info
ADD UNIQUE INDEX udx_job_identity (job_group, job_desc);
-- 增加update_time索引
ALTER TABLE job_info
ADD INDEX idx_update_time (update_time);
监控与报警:
-- 监控死锁
SHOW ENGINE INNODB STATUS;
-- 长期监控
SELECT * FROM information_schema.INNODB_METRICS
WHERE NAME LIKE 'lock_deadlocks';
方案 | 优点 | 缺点 | 适用场景 |
---|---|---|---|
数据预处理 | 彻底解决冲突 | 增加内存消耗 | 内存充足时最佳 |
SQL分句执行 | 减少锁竞争 | 执行效率略低 | 中小批量数据 |
减小批量大小 | 简单有效 | 增加事务次数 | 快速解决方案 |
死锁重试 | 提高容错性 | 无法根本解决 | 补充方案 |
最终建议:组合使用数据预处理 + 减小批量大小 + 死锁重试机制,可在保证性能的同时最大限度避免死锁。