多线程导入数据发生死锁场景


现在有一段Java代码,逻辑是将一批数据通过多线程的方式将1000000万条数据新增/更新到表里。 spring boot+mybatis,每次处理10000条数据。执行的sql如下:

<insert id="insertBatch" >
        insert into job_info
        (job_group,job_desc,
        add_time,update_time,author,
        alarm_email,schedule_type,schedule_conf,
        misfire_strategy,executor_route_strategy,executor_handler,
        executor_param,executor_block_strategy,executor_timeout,
        executor_fail_retry_count,glue_type,glue_source,
        glue_remark,glue_updatetime,child_jobid,
        trigger_status,trigger_last_time,trigger_next_time)
        values
        <foreach collection="list" item="item">
            (
            #{item.jobGroup},#{item.jobDesc},
            #{item.addTime},#{item.updateTime},#{item.author},
            #{item.alarmEmail},#{item.scheduleType},#{item.scheduleConf},
            #{item.misfireStrategy},#{item.executorRouteStrategy},#{item.executorHandler},
            #{item.executorParam},#{item.executorBlockStrategy},#{item.executorTimeout},
            #{item.executorFailRetryCount},#{item.glueType},#{item.glueSource},
            #{item.glueRemark},#{item.glueUpdatetime},#{item.childJobid},
            #{item.triggerStatus},#{item.triggerLastTime},#{item.triggerNextTime}
            )
        </foreach>
        ON DUPLICATE key update
        job_group = values(job_group),
        job_desc = values(job_desc),
        add_time = values(add_time),
        update_time = values(update_time),
        author = values(author),
        alarm_email = values(alarm_email),
        schedule_type = values(schedule_type),
        schedule_conf = values(schedule_conf),
        misfire_strategy = values(misfire_strategy),
        executor_route_strategy = values(executor_route_strategy),
        executor_handler = values(executor_handler),
        executor_param = values(executor_param),
        executor_block_strategy = values(executor_block_strategy),
        executor_timeout = values(executor_timeout),
        executor_fail_retry_count = values(executor_fail_retry_count),
        glue_type = values(glue_type),
        glue_source = values(glue_source),
        glue_remark = values(glue_remark),
        glue_updatetime = values(glue_updatetime),
        child_jobid = values(child_jobid),
        trigger_status = values(trigger_status),
        trigger_last_time = values(trigger_last_time),
        trigger_next_time = values(trigger_next_time)
    </insert>

现在由于1000000完条数据中存在相同的数据,导致多线程批量处理数据时发生了死锁。

分析原因:

问题分析:多线程批量插入更新数据时出现死锁。

死锁通常发生在多个事务竞争资源时,互相等待对方释放锁,形成循环等待。

在MySQL中,使用ON DUPLICATE KEY UPDATE进行批量插入更新时,可能会引发死锁。原因如下:

  1. 索引与锁竞争ON DUPLICATE KEY UPDATE在执行时,会先尝试插入,如果发现唯一键冲突,则转为更新。在插入时,会对记录加排他锁(X锁)。在批量处理中,如果多条记录在同一个事务中,且这些记录在存储引擎中的位置相邻,那么它们可能会加锁相邻的记录,而加锁的顺序与记录在表中的物理顺序或索引顺序有关。

  2. 多线程事务:多个线程同时执行批量操作,每个线程处理一批数据(10000条)。不同批次的数据中可能存在相同的记录(即存在相同唯一键的记录),导致多个事务试图更新同一条记录。

  3. 锁的获取顺序不一致:不同的事务中,由于数据顺序的差异,加锁的顺序可能不同。例如:

- 事务1先锁记录A,再尝试锁记录B

- 事务2先锁记录B,再尝试锁记录A

这样就会形成死锁。

  1. 批量大小:每次处理10000条,这个批量较大,增加了在一个事务中涉及多条记录的可能性,从而增加了死锁的概率。

具体到当前SQL,死锁可能发生在以下情况:

- 表job_info有唯一索引(可能是主键或唯一键),当批量插入的数据中存在重复的唯一键时,就会触发更新。

- 多个线程同时处理的数据中,包含有相同唯一键的记录,且这些记录在批量数据中的位置不同,导致加锁顺序不一致。

另外,MySQL在处理ON DUPLICATE KEY UPDATE时,会对重复的行加排他锁(X锁),而由于是批量操作,事务中会涉及多行,因此可能会持有多个锁。当多个事务以不同的顺序请求这些锁时,就会形成循环等待。

如何避免死锁?

  1. 减小批量大小:将每次处理的批量数据量减小(如从10000条减少到500条),这样可以减少每个事务持有的锁数量,从而降低死锁概率。但注意,批量太小会影响效率。

  2. 保证加锁顺序一致:在多个线程中,对要处理的数据按照相同的规则排序(例如按照唯一键排序),确保每个事务加锁的顺序是一致的,从而避免循环等待。

  3. 使用单线程:如果数据量允许,可以考虑单线程处理,避免并发带来的死锁问题。但这样会降低处理速度。

  4. 重试机制:在捕获到死锁异常(MySQL死锁异常代码为1213)时进行重试。

  5. 减少事务时间:尽快提交事务,减少锁的持有时间。

  6. 分离插入和更新:先查询哪些记录已存在,然后分别进行批量插入和批量更新。但这样会增加一次查询,并且需要应用层处理,可能会更复杂。

针对当前场景,建议:

- 对每批要处理的数据,按照唯一键(比如主键)进行排序,保证每个线程处理数据时加锁顺序一致。

- 减小批量大小(例如1000条),并进行测试,找到合适的批量大小。

- 在代码中加入死锁重试机制。

例如,在代码中可以这样实现重试:

```java

int maxRetries = 3;

int retryCount = 0;

while (retryCount < maxRetries) {

try {

// 执行批量插入/更新

batchInsertOrUpdate(dataBatch);

break; // 成功则跳出循环

} catch (DeadlockLoserDataAccessException e) { // Spring封装了死锁异常

retryCount++;

if (retryCount >= maxRetries) {

throw e;

}

// 等待一段时间后重试

Thread.sleep((long) (Math.random() * 100)); // 随机等待避免同时重试

}

}

```

另外,检查数据库的隔离级别,建议使用READ COMMITTED,因为REPEATABLE READ在InnoDB中更容易出现死锁。

总结:死锁的原因是多个事务以不同的顺序请求锁,解决方案是保证加锁顺序一致、减小事务粒度(批量大小)、加入重试机制。


多线程批量插入更新死锁原因分析与解决方案

死锁原因分析

1. 唯一键冲突导致的锁竞争

  • 核心问题ON DUPLICATE KEY UPDATE在遇到重复键时会转换为更新操作
  • 锁机制
    • 插入时:对不存在的记录加插入意向锁
    • 更新时:对已存在的记录加排他锁(X锁)
  • 多线程冲突:多个线程同时处理包含相同唯一键的记录时,会争相同记录的锁

2. 批量操作中的锁升级

deepseek_mermaid_20250617_c9335f

3. 具体死锁场景

  1. 线程交叉锁定
    • 线程1锁定记录A,尝试锁定记录B
    • 线程2锁定记录B,尝试锁定记录A
    • 形成循环等待
  2. 间隙锁冲突
    • 当批量插入的记录在索引中相邻时
    • MySQL会加间隙锁(Gap Lock)
    • 多线程操作相邻区间导致死锁
  3. 批量大小放大锁范围
    • 每次处理10000条的大批量
    • 增加事务持有锁的数量和时间
    • 提高死锁概率

解决方案

1. 优化数据预处理(推荐)

// 在分发给线程前进行去重
public List<JobInfo> preprocessData(List<JobInfo> allData) {
    Map<String, JobInfo> uniqueMap = new ConcurrentHashMap<>();
    
    allData.parallelStream().forEach(item -> {
        String key = item.getJobGroup() + "_" + item.getJobDesc(); // 唯一键组合
        uniqueMap.merge(key, item, (oldVal, newVal) -> {
            // 保留最新数据
            return newVal.getUpdateTime().after(oldVal.getUpdateTime()) ? newVal : oldVal;
        });
    });
    
    return new ArrayList<>(uniqueMap.values());
}

2. 优化批处理逻辑

// 分批处理时按唯一键排序
public void processInBatches(List<JobInfo> data) {
    // 按唯一键排序(确保所有线程相同加锁顺序)
    data.sort(Comparator.comparing(JobInfo::getJobGroup)
                     .thenComparing(JobInfo::getJobDesc));
    
    int batchSize = 1000; // 减小批量大小
    List<List<JobInfo>> batches = ListUtils.partition(data, batchSize);
    
    batches.parallelStream().forEach(batch -> {
        try {
            jobMapper.insertBatch(batch);
        } catch (DeadlockLoserDataAccessException e) {
            // 死锁重试逻辑
            retryInsert(batch, 3); 
        }
    });
}

// 死锁重试机制
private void retryInsert(List<JobInfo> batch, int maxRetries) {
    int attempt = 0;
    while (attempt < maxRetries) {
        try {
            Thread.sleep((long) (Math.random() * 100)); // 随机等待
            jobMapper.insertBatch(batch);
            return;
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        } catch (DeadlockLoserDataAccessException e) {
            attempt++;
        }
    }
    // 记录失败批次
    log.error("Failed to process batch after {} retries", maxRetries);
}

3. SQL优化方案

<insert id="insertBatch">
    <foreach collection="list" item="item" separator=";">
        INSERT INTO job_info
        (job_group, job_desc, ...)
        VALUES
        (#{item.jobGroup}, #{item.jobDesc}, ...)
        ON DUPLICATE KEY UPDATE
        job_group = VALUES(job_group),
        ...
    </foreach>
</insert>

优化点

  • 使用分号分隔的多个独立INSERT语句
  • 每个操作独立提交,减少锁持有时间
  • 需在连接字符串添加allowMultiQueries=true

4. 数据库配置优化

spring:
  datasource:
    hikari:
      maximum-pool-size: 20 # 控制并发连接数
    tomcat:
      max-wait: 10000 # 最大等待时间
-- 表结构优化
ALTER TABLE job_info 
    MODIFY COLUMN glue_source TEXT,
    ENGINE=InnoDB 
    ROW_FORMAT=COMPRESSED 
    KEY_BLOCK_SIZE=8;

性能优化建议

  1. 批量大小调整

    • 初始值:1000条/批
    • 根据测试调整:500-2000条范围找到最佳点
  2. 线程池优化

    @Configuration
    public class ThreadPoolConfig {
    
        @Bean("batchTaskExecutor")
        public TaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(8);
            executor.setMaxPoolSize(16);
            executor.setQueueCapacity(50);
            executor.setThreadNamePrefix("batch-");
            executor.initialize();
            return executor;
        }
    }
    
  3. 索引优化

    -- 确保唯一索引存在
    ALTER TABLE job_info 
        ADD UNIQUE INDEX udx_job_identity (job_group, job_desc);
    
    -- 增加update_time索引
    ALTER TABLE job_info 
        ADD INDEX idx_update_time (update_time);
    
  4. 监控与报警

    -- 监控死锁
    SHOW ENGINE INNODB STATUS;
    
    -- 长期监控
    SELECT * FROM information_schema.INNODB_METRICS
    WHERE NAME LIKE 'lock_deadlocks';
    

方案对比

方案优点缺点适用场景
数据预处理彻底解决冲突增加内存消耗内存充足时最佳
SQL分句执行减少锁竞争执行效率略低中小批量数据
减小批量大小简单有效增加事务次数快速解决方案
死锁重试提高容错性无法根本解决补充方案

最终建议:组合使用数据预处理 + 减小批量大小 + 死锁重试机制,可在保证性能的同时最大限度避免死锁。

常见错误
SpringBoot
JAVA-技能点
多线程
Mybatis