高并发场景分析


高并发场景分析

一、高并发场景识别

1.1 设备大规模并发上报数据 最高并发

场景描述

  • 数万台充电桩设备同时上报充电状态、属性数据、事件信息
  • 高频消息包括充电中状态、梯度属性上报等
  • 预计并发量:10,000+ 设备,每秒 50,000+ 条消息

核心代码位置

文件路径作用
ThingHighFrequencyEventListener.javainfrastructure/mq/kafka/consumer/thing/event/高频事件监听器
SamplingServiceImpl.javaservice/impl/采样控制服务
KafkaHighConsumerConfig.javaconfig/高频消费者配置

实现逻辑

// Kafka 高频消费者配置 - 3 个并发消费者,每次拉取 500 条
@Configuration
public class KafkaHighConsumerConfig {
    propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
                maxPollHighFrequencyRecords);  // 500
    
    factory.setConcurrency(concurrency);  // 3
}

// 采样控制 - 降低处理压力(默认 10:1 采样)
@Service
public class SamplingServiceImpl implements SamplingService {
    
    public boolean isSampled(String deviceSN, String sceneCode) {
        // 基于设备 ID + 场景码的原子计数
        ConcurrentHashMap<String, AtomicInteger> samplingSceneMap = 
            samplingLocalCache.getIfPresent(deviceSN);
        
        AtomicInteger requestCountAtomic = samplingSceneMap.get(sceneCode);
        int requestCount = requestCountAtomic.incrementAndGet();
        
        // 按采样步长命中(requestCount % samplingStep == 0)
        return requestCount % samplingStep == 0;
    }
}

并发量估算

  • 输入:50,000 TPS(原始消息)
  • 采样后:5,000 TPS(10:1 采样)
  • 实际处理:约 3,000-4,000 TPS(考虑 Kafka 缓冲)

1.2 批量设备操作 高并发

场景描述

  • 批次创建设备(单次最多 99,999 台)
  • 批量导入设备信息
  • 批量 OTA 升级任务下发

核心代码位置

文件路径作用
IotDeviceBatchController.javaapi/批量操作接口
IotDeviceBatchServiceImpl.javaservice/impl/批量服务实现
OtaServiceImpl.javaservice/impl/OTA 升级服务

异步处理实现

@RestController
@RequestMapping("/api/device/batch")
public class IotDeviceBatchController {
    
    @Autowired
    @Qualifier("device4hExecutor")
    private ThreadPoolTaskExecutor executor;
    
    @PostMapping("/createDeviceBatch")
    public GepDeviceResponse<Boolean> createDeviceBatch(
            @RequestBody CreateDeviceBatchRequest request) {
        
        // CompletableFuture 异步处理,立即返回
        CompletableFuture.runAsync(() -> {
            GepDeviceResponse<Boolean> response = 
                iotDeviceBatchService.createDeviceBatch(request);
            log.info("[device4h] createDeviceBatch async response={}", 
                    JSON.toJSONString(response));
        }, executor);
        
        return GepDeviceResponseBuilder.buildSuccess(Boolean.TRUE);
    }
    
    @PostMapping("/importDevice")
    public GepDeviceResponse<Boolean> importDevice(
            @RequestParam("file") MultipartFile file) {
        
        // 批量导入异步处理
        CompletableFuture.runAsync(() -> {
            iotDeviceBatchService.importDevice(file);
        }, executor);
        
        return GepDeviceResponseBuilder.buildSuccess(Boolean.TRUE);
    }
}

批量插入优化

@Service
public class IotDeviceBatchServiceImpl implements IotDeviceBatchService {
    
    // 分页处理,每页 2000 条
    private static final int BATCH_SIZE = 2000;
    
    @Override
    @Transactional(rollbackFor = Exception.class)
    public GepDeviceResponse<Boolean> batchAddDevice(
            List<IotDevice> deviceList) {
        
        // 分批插入,避免单次事务过大
        for (int i = 0; i < deviceList.size(); i += BATCH_SIZE) {
            int end = Math.min(i + BATCH_SIZE, deviceList.size());
            List<IotDevice> subList = deviceList.subList(i, end);
            
            // 批量插入
            iotDeviceMapper.batchInsert(subList);
        }
        
        return GepDeviceResponseBuilder.buildSuccess(true);
    }
}

并发量估算

  • 单次批次创建:100,000 台设备
  • 分批处理:50 批 × 2000 条/批
  • 处理时长:约 30-60 秒(后台异步)

1.3 时序数据高并发写入 高并发

场景描述

  • 充电过程数据实时写入时序数据库
  • 设备属性梯度上报存储
  • 预计写入 QPS:10,000+

核心代码位置

文件路径作用
LindormConfig.javaconfig/Lindorm 配置
InfluxDBConfig.javaconfig/InfluxDB 配置
LindormTsdbService.javainfrastructure/lindorm/Lindorm 写入服务
InfluxDbWriteService.javainfrastructure/influxdb/InfluxDB 写入服务

Lindorm 批量写入配置

@Configuration
public class LindormConfig {
    
    @Bean
    public TsdbClient lindormTsdbClient() {
        ClientOptions options = ClientOptions.builder()
            .connectTimeout(Duration.ofSeconds(30))
            .writeOptions(WriteOptions.newBuilder()
                .batchSize(100)              // 批次大小 100
                .maxPointBatches(32)         // 最大批次数 32
                .maxWaitTimeMs(1000)         // 最大等待时间 1 秒
                .build())
            .connectionPoolOptions(ConnectionPoolOptions.builder()
                .maxIdleConnections(20)      // 最大空闲连接 20
                .keepAliveDuration(5)        // 保活时间 5 秒
                .build())
            .build();
        
        return TsdbClientFactory.createClient(options);
    }
}

异步批量写入实现

@Service
public class LindormTsdbService {
    
    @Autowired
    private TsdbClient tsdbClient;
    
    /**
     * 异步批量写入
     */
    public boolean batchWriteAsync(List<Record> records) {
        try {
            // 异步写入,不阻塞主线程
            tsdbClient.writeAsync(records).whenComplete((result, throwable) -> {
                if (throwable != null) {
                    log.error("Lindorm 异步写入失败", throwable);
                } else {
                    log.debug("Lindorm 写入成功,记录数:{}", records.size());
                }
            });
            return true;
        } catch (Exception e) {
            log.error("Lindorm 写入异常", e);
            return false;
        }
    }
    
    /**
     * 同步批量写入(带重试)
     */
    public boolean batchWriteSync(List<Record> records) {
        int retryCount = 0;
        int maxRetry = 3;
        
        while (retryCount < maxRetry) {
            try {
                tsdbClient.write(records).join();  // 同步等待
                return true;
            } catch (Exception e) {
                retryCount++;
                log.warn("Lindorm 写入失败,第{}次重试", retryCount, e);
                
                if (retryCount >= maxRetry) {
                    log.error("Lindorm 写入失败,已达最大重试次数", e);
                    return false;
                }
                
                try {
                    Thread.sleep(100 * retryCount);  // 指数退避
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        return false;
    }
}

InfluxDB 批量写入配置

# application.yaml
influxdb:
  enableBatch:
    actions: 2000            # 2000 条数据点一批
    flushDuration: 100       # 100ms 刷新一次
  okhttpClient:
    poolConnections: 20      # 连接池 20 个连接
    poolKeepAliveDuration: 300000  # 保活时间 5 分钟

并发量估算

  • Lindorm:10,000+ TPS(异步批量)
  • InfluxDB:8,000+ TPS(批量写入)
  • 双写模式:总写入量可达 15,000+ TPS

1.4 Kafka 高并发消费 高并发

场景描述

  • 多 Topic 并行消费(设备事件、OTA 进度、业务通知等)
  • 不同优先级消息分离消费

核心代码位置

配置文件路径用途
KafkaConsumerConfig.javaconfig/通用消费者配置
KafkaHighConsumerConfig.javaconfig/高频消费者配置
KafkaBatchConsumerConfig.javaconfig/批量消费者配置

多消费者工厂配置

@Configuration
public class KafkaConsumerConfig {
    
    // 通用消费者工厂 - 2 个并发
    @Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);  // 2
        return factory;
    }
}

@Configuration
public class KafkaHighConsumerConfig {
    
    // 高频消费者工厂 - 3 个并发,每次拉取 500 条
    @Bean("highKafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> highKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(highConsumerFactory());
        factory.setConcurrency(highConcurrency);  // 3
        factory.getContainerProperties().setPollTimeout(pollTimeout);  // 3000ms
        return factory;
    }
}

@Configuration
public class KafkaBatchConsumerConfig {
    
    // 批量消费者工厂 - 2 个并发,每次拉取 500 条
    @Bean("batchKafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> batchKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(batchConsumerFactory());
        factory.setConcurrency(batchConcurrency);  // 2
        return factory;
    }
}

消费者配置参数对比

参数通用消费者高频消费者批量消费者
并发度232
每次拉取记录数100500500
轮询超时时间3000ms3000ms3000ms
适用场景普通业务消息充电状态等高频消息批量事件处理

并发量估算

  • 总消费能力:约 60,000+ TPS
  • 通用消费者:2 × 100 = 200 条/轮询
  • 高频消费者:3 × 500 = 1500 条/轮询
  • 批量消费者:2 × 500 = 1000 条/轮询

二、高并发技术实现详解

2.1 线程池配置(ExecutorConfig)

配置文件位置ExecutorConfig.java

@Configuration
public class ExecutorConfig {
    
    @Bean("device4hExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        
        // 核心配置
        executor.setCorePoolSize(10);           // 核心线程数
        executor.setMaxPoolSize(20);            // 最大线程数
        executor.setQueueCapacity(30);          // 缓冲队列容量
        executor.setKeepAliveSeconds(10);       // 空闲线程存活时间
        executor.setThreadNamePrefix("Async-Thread-Device4h-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        
        executor.initialize();
        return executor;
    }
}

使用场景统计

使用位置调用方式并发量
IotDeviceBatchController.createDeviceBatch()CompletableFuture.runAsync()100 批次/分钟
IotDeviceBatchController.importDevice()CompletableFuture.runAsync()50 批次/分钟
CacheDeviceBaseInfoJob.process()CompletableFuture.runAsync()1 次/5 分钟
CacheDeviceActiveStatusJob.process()CompletableFuture.runAsync()1 次/5 分钟

线程池监控建议

// 建议添加线程池监控
executor.setThreadPoolStatsListener(stats -> {
    log.info("线程池状态 - 活跃线程:{}, 队列大小:{}, 已完成任务:{}", 
            stats.getActiveCount(), 
            stats.getQueueSize(), 
            stats.getCompletedTaskCount());
});

2.2 并发容器使用

ConcurrentHashMap + AtomicInteger 组合

@Service
public class SamplingServiceImpl implements SamplingService {
    
    // Guava Cache 嵌套 ConcurrentHashMap + AtomicInteger
    static Cache<String, ConcurrentHashMap<String, AtomicInteger>> samplingLocalCache = 
        CacheBuilder.newBuilder()
            .initialCapacity(50000)
            .maximumSize(500000)
            .expireAfterAccess(4, TimeUnit.HOURS)
            .build();
    
    public boolean isSampled(String deviceSN, String sceneCode) {
        // 获取或创建设备的采样场景 Map
        ConcurrentHashMap<String, AtomicInteger> samplingSceneMap = 
            samplingLocalCache.get(deviceSN, () -> new ConcurrentHashMap<>());
        
        // 原子计数器
        AtomicInteger requestCountAtomic = 
            samplingSceneMap.computeIfAbsent(sceneCode, k -> new AtomicInteger(0));
        
        // 原子自增,线程安全
        int requestCount = requestCountAtomic.incrementAndGet();
        
        // 溢出重置
        if (requestCount > samplingTotalSize) {
            requestCountAtomic.set(0);
            return true;
        }
        
        // 采样判断
        return requestCount % samplingStep == 0;
    }
}

线程安全性分析

  • ConcurrentHashMap:分段锁,支持高并发读写
  • AtomicInteger:CAS 原子操作,无锁自增
  • Cache.get(key, Callable):原子性获取或创建

2.3 本地缓存(Caffeine & Guava)

Caffeine 缓存(设备基础信息二级缓存)

@Service
public class DeviceLocalCacheServiceImpl implements DeviceLocalCacheService {
    
    // Caffeine 缓存配置
    private final Cache<String, DeviceBaseInfoDTO> deviceLocalCache = 
        Caffeine.newBuilder()
            .initialCapacity(100000)                  // 初始容量 10 万
            .maximumSize(500000)                      // 最大 50 万条
            .expireAfter(new RandomExpiry(21600, 7200)) // 随机过期 6-8 小时
            .recordStats()                            // 开启统计
            .build();
    
    @Override
    public DeviceBaseInfoDTO getDeviceBaseInfo(String deviceSN) {
        return deviceLocalCache.get(deviceSN, sn -> {
            // Cache Miss 时从 Redis 或 DB 加载
            return loadFromDatabase(sn);
        });
    }
    
    @Override
    public void updateDeviceBaseInfo(String deviceSN, DeviceBaseInfoDTO info) {
        deviceLocalCache.put(deviceSN, info);
    }
}

随机过期策略(防止缓存雪崩)

public class RandomExpiry implements Expiry<String, DeviceBaseInfoDTO> {
    
    private final int baseExpireTime;      // 基础过期时间 6 小时
    private final int randomExpireTime;    // 随机范围 2 小时
    
    @Override
    public long expireAfterCreate(DeviceBaseInfoDTO info, long currentTime) {
        // 每个 key 独立的随机过期时间
        return TimeUnit.SECONDS.toNanos(
            baseExpireTime + ThreadLocalRandom.current().nextInt(randomExpireTime)
        );
    }
}

Guava Cache(OTA 任务缓存)

@Component
public class OtaTaskManageImpl {
    
    // OTA 任务本地缓存 - 小容量短过期
    static Cache<String, OtaTask> otaTaskLocalCache = CacheBuilder.newBuilder()
        .initialCapacity(10)
        .maximumSize(20)
        .expireAfterWrite(10, TimeUnit.MINUTES)
        .build();
    
    public OtaTask getOtaTask(String taskId) {
        return otaTaskLocalCache.getIfPresent(taskId);
    }
}

缓存命中率统计

// 通过 recordStats() 开启统计
CacheStats stats = deviceLocalCache.stats();
log.info("缓存命中率:{}", stats.hitRate());  // 目标 > 90%
log.info("Miss 次数:{}", stats.missCount());
log.info("Hit 次数:{}", stats.hitCount());

2.4 Redis 缓存(分布式缓存)

核心代码位置DeviceCacheServiceImpl.java

缓存 Key 定义

public interface DeviceCacheKeyConstant {
    // 设备影子缓存
    String DEVICE_SHADOW_KEY = "iot:device:shadow:";
    
    // 设备基础信息缓存
    String DEVICE_INFO_DEVICE_SN_KEY = "iot:device:info:sn:";
    
    // 设备扩展信息缓存
    String DEVICE_EXT_CACHE_TOC_KEY = "iot:device:ext:toc:";
    
    // 设备激活状态缓存
    String DEVICE_ACTIVE_STATUS_KEY = "iot:device:active:";
    
    // OTA 进度缓存
    String DEVICE_OTA_PROGRESS_KEY = "iot:device:ota:progress:";
}

缓存操作方法

@Service
public class DeviceCacheServiceImpl implements DeviceCacheService {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Override
    public void saveDeviceShadow(String deviceId, String shadow) {
        // 带过期时间的缓存(24 小时)
        redisTemplate.opsForValue().set(
            DEVICE_SHADOW_KEY + deviceId, 
            shadow, 
            24, TimeUnit.HOURS
        );
    }
    
    @Override
    public String getDeviceShadow(String deviceId) {
        return redisTemplate.opsForValue().get(DEVICE_SHADOW_KEY + deviceId);
    }
    
    @Override
    public void saveDeviceBaseInfo(String deviceSN, DeviceBaseInfoDTO info, long expireTime) {
        // 随机过期时间,防止缓存雪崩
        long randomExpire = expireTime + ThreadLocalRandom.current().nextInt(7200);
        redisTemplate.opsForValue().set(
            DEVICE_INFO_DEVICE_SN_KEY + deviceSN,
            JSON.toJSONString(info),
            randomExpire,
            TimeUnit.SECONDS
        );
    }
}

并发访问控制

@Override
public Boolean tryLockDevice(String deviceId, long expireSeconds) {
    String lockKey = "iot:lock:device:" + deviceId;
    String requestId = UUID.randomUUID().toString();
    
    // SETNX 分布式锁
    Boolean success = redisTemplate.opsForValue()
        .setIfAbsent(lockKey, requestId, expireSeconds, TimeUnit.SECONDS);
    
    return Boolean.TRUE.equals(success);
}

并发量估算

  • Redis QPS:100,000+(单机)
  • 缓存命中率:目标 > 90%
  • 数据库查询降低:10 倍以上

2.5 数据库连接池(Druid)

配置文件位置application.yaml

spring:
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
      initial-size: 3                    # 初始连接数
      min-idle: 3                        # 最小空闲连接
      max-active: 20                     # 最大活跃连接
      max-wait: 10000                    # 获取连接最大等待时间 (ms)
      
      # 检测配置
      time-between-eviction-runs-millis: 60000
      min-evictable-idle-time-millis: 60000
      max-evictable-idle-time-millis: 3000000
      
      # 健康检查
      validation-query: SELECT 'x'
      test-while-idle: true
      test-on-borrow: false
      test-on-return: false
      
      # 监控配置
      pool-prepared-statements: false
      max-pool-prepared-statement-per-connection-size: 100
      
      # WallFilter 防火墙
      filters: stat,wall,slf4j
      connection-properties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000

Druid 监控配置

@Configuration
public class DruidConfig {
    
    @Bean
    public ServletRegistrationBean<StatViewServlet> druidStatViewServlet() {
        StatViewServlet servlet = new StatViewServlet();
        ServletRegistrationBean<StatViewServlet> registration = 
            new ServletRegistrationBean<>(servlet, "/druid/*");
        
        // 监控页面登录账号密码
        registration.addInitParameter("loginUsername", "admin");
        registration.addInitParameter("loginPassword", "password");
        
        // IP 白名单
        registration.addInitParameter("allow", "127.0.0.1");
        
        return registration;
    }
    
    @Bean
    public FilterRegistrationBean<WebStatFilter> druidWebStatFilter() {
        WebStatFilter filter = new WebStatFilter();
        FilterRegistrationBean<WebStatFilter> registration = 
            new FilterRegistrationBean<>(filter);
        
        registration.addUrlPatterns("/*");
        registration.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");
        
        return registration;
    }
}

并发量估算

  • 最大并发查询:20(受限于 max-active)
  • 建议优化:根据实际负载调整至 50-100

2.6 分布式 ID 生成器(Snowflake)

核心代码位置IdWorker.java

java

@Component
public class IdWorker {
    
    // 起始时间戳(2024-01-01 00:00:00 UTC)
    private static final long START_TIMESTAMP = 1704067200000L;
    
    // 各部分位数
    private static final long TIMESTAMP_BITS = 41;   // 时间戳 41 位
    private static final long MACHINE_BITS = 5;      // 机器 ID 5 位
    private static final long DATA_CENTER_BITS = 5;  // 数据中心 5 位
    private static final long SEQUENCE_BITS = 12;    // 序列号 12 位
    
    // 最大值
    private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS);  // 4095
    
    // 位移量
    private static final long TIMESTAMP_SHIFT = SEQUENCE_BITS + MACHINE_BITS + DATA_CENTER_BITS;
    private static final long MACHINE_SHIFT = SEQUENCE_BITS + DATA_CENTER_BITS;
    private static final long DATA_CENTER_SHIFT = SEQUENCE_BITS;
    
    private long sequence = 0L;
    private long lastTimestamp = -1L;
    private final long machineId;
    private final long dataCenterId;
    
    public IdWorker(@Value("${id.worker.machineId:1}") long machineId,
                   @Value("${id.worker.dataCenterId:1}") long dataCenterId) {
        this.machineId = machineId;
        this.dataCenterId = dataCenterId;
    }
    
    /**
     * 生成下一个 ID(线程安全)
     */
    public synchronized long nextId() {
        long timestamp = System.currentTimeMillis();
        
        // 时钟回拨处理
        if (timestamp < lastTimestamp) {
            throw new RuntimeException("时钟回拨异常");
        }
        
        if (timestamp == lastTimestamp) {
            // 同一毫秒内,序列号自增
            sequence = (sequence + 1) & MAX_SEQUENCE;
            
            if (sequence == 0) {
                // 序列号溢出,等待下一毫秒
                timestamp = waitNextMillis(lastTimestamp);
            }
        } else {
            // 不同毫秒,序列号重置
            sequence = 0L;
        }
        
        lastTimestamp = timestamp;
        
        // 组合 ID
        return ((timestamp - START_TIMESTAMP) << TIMESTAMP_SHIFT)
             | (dataCenterId << DATA_CENTER_SHIFT)
             | (machineId << MACHINE_SHIFT)
             | sequence;
    }
    
    private long waitNextMillis(long lastTimestamp) {
        long timestamp = System.currentTimeMillis();
        while (timestamp <= lastTimestamp) {
            timestamp = System.currentTimeMillis();
        }
        return timestamp;
    }
}

ID 结构

┌──────┬─────────┬──────────┬─────────┐
│ 1bit │ 41bit   │ 10bit    │ 12bit   │
│ sign │ timestamp│ machine  │ sequence│
└──────┴─────────┴──────────┴─────────┘

性能指标

  • 理论产能:26 万 ID/秒
  • 线程安全:synchronized 保证
  • 全局唯一:时间戳 + 机器 ID+ 序列号

三、性能优化措施总结

3.1 多级缓存架构

┌─────────────────┐
│   客户端请求     │
└────────┬────────┘
         │
         ▼
┌─────────────────┐
│  Caffeine 本地缓存│ ← L1: 6-8 小时随机过期,命中率~70%
└────────┬────────┘
         │ Miss (30%)
         ▼
┌─────────────────┐
│  Redis 分布式缓存 │ ← L2: 24 小时过期,命中率~95%
└────────┬────────┘
         │ Miss (5%)
         ▼
┌─────────────────┐
│    MySQL 数据库  │ ← 最终数据源
└─────────────────┘

缓存命中率计算

  • L1 命中率:70%
  • L2 命中率:(1-70%) × 95% = 28.5%
  • 总命中率:70% + 28.5% = 98.5%
  • 数据库查询降低:约 67 倍

3.2 采样控制策略

采样配置

public class SamplingConstant {
    // 采样总数 100 万
    public static final int SAMPLING_TOTAL_SIZE = 1000000;
    
    // 默认采样步长 10(即 10:1 采样)
    public static final int DEFAULT_SAMPLING_STEP = 10;
    
    // 充电中状态业务处理采样场景
    public static final String SCENE_CODE_CHARGE_STATUS_FOR_BIZ = "SCENE_CODE_CHARGE_STATUS_FOR_BIZ";
    
    // 充电中状态 InfluxDB 写入采样场景
    public static final String SCENE_CODE_CHARGE_STATUS_FOR_INFLUXDB = "SCENE_CODE_CHARGE_STATUS_FOR_INFLUXDB";
}

动态采样比例建议

@Service
public class SamplingServiceImpl {
    
    @Value("${sampling.step.default:10}")
    private int defaultSamplingStep;
    
    @Autowired
    private RedisTemplate<String, Integer> redisTemplate;
    
    public int getSamplingStep(String sceneCode) {
        // 支持从 Redis 动态获取采样比例
        String key = "iot:sampling:step:" + sceneCode;
        Integer step = redisTemplate.opsForValue().get(key);
        
        return step != null ? step : defaultSamplingStep;
    }
}

3.3 批量操作优化对比

操作类型优化前优化后提升倍数
单条插入设备1 条/SQL2000 条/SQL200x
单条写入 InfluxDB1 条/请求2000 条/批次200x
单条写入 Lindorm1 条/请求100 条/批次100x
同步 OTA 下发1 台/请求批量异步50x

四、并发量汇总与瓶颈分析

4.1 系统整体并发量估算

场景输入并发处理后并发备注
Kafka 消息接收50,000 TPS50,000 TPS分区并行消费
采样后处理50,000 TPS5,000 TPS10:1 采样
Redis 缓存访问100,000 QPS100,000 QPS多级缓存
InfluxDB 写入8,000 TPS8,000 TPS批量写入
Lindorm 写入10,000 TPS10,000 TPS异步批量
MySQL 查询5,000 QPS5,000 QPS连接池限制 20
批次创建设备100,000 台/批次异步后台处理不阻塞接口

4.2 潜在瓶颈分析

瓶颈点当前配置风险等级建议优化方案
数据库连接池max-active=20扩容至 50-100
线程池大小core=10, max=20根据负载调整至 50-100
采样比例固定固定 10:1支持动态配置
缺少熔断降级引入 Sentinel
缺少分布式锁引入 Redisson
Kafka 无重试无重试机制配置手动 Ack+ 重试

五、关键文件清单

文件路径作用并发相关配置
ExecutorConfig.javaconfig/线程池配置core=10, max=20, queue=30
KafkaConsumerConfig.javaconfig/Kafka 通用消费者concurrency=2, poll=100
KafkaHighConsumerConfig.javaconfig/Kafka 高频消费者concurrency=3, poll=500
KafkaBatchConsumerConfig.javaconfig/Kafka 批量消费者concurrency=2, poll=500
LindormConfig.javaconfig/Lindorm 配置batchSize=100, maxPoint=32
InfluxDBConfig.javaconfig/InfluxDB 配置actions=2000, flush=100ms
SamplingServiceImpl.javaservice/impl/采样服务samplingStep=10
DeviceLocalCacheServiceImpl.javaservice/impl/Caffeine 本地缓存maxSize=500000
DeviceCacheServiceImpl.javaservice/impl/Redis 缓存服务expire=24h
IdWorker.javautils/分布式 ID 生成26 万 ID/秒
IotDeviceBatchController.javaapi/批量操作接口异步处理
OtaServiceImpl.javaservice/impl/OTA 升级服务批量推送
SpringBoot
JAVA-技能点
多线程