场景描述:
核心代码位置:
| 文件 | 路径 | 作用 |
|---|---|---|
ThingHighFrequencyEventListener.java | infrastructure/mq/kafka/consumer/thing/event/ | 高频事件监听器 |
SamplingServiceImpl.java | service/impl/ | 采样控制服务 |
KafkaHighConsumerConfig.java | config/ | 高频消费者配置 |
实现逻辑:
// Kafka 高频消费者配置 - 3 个并发消费者,每次拉取 500 条
@Configuration
public class KafkaHighConsumerConfig {
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
maxPollHighFrequencyRecords); // 500
factory.setConcurrency(concurrency); // 3
}
// 采样控制 - 降低处理压力(默认 10:1 采样)
@Service
public class SamplingServiceImpl implements SamplingService {
public boolean isSampled(String deviceSN, String sceneCode) {
// 基于设备 ID + 场景码的原子计数
ConcurrentHashMap<String, AtomicInteger> samplingSceneMap =
samplingLocalCache.getIfPresent(deviceSN);
AtomicInteger requestCountAtomic = samplingSceneMap.get(sceneCode);
int requestCount = requestCountAtomic.incrementAndGet();
// 按采样步长命中(requestCount % samplingStep == 0)
return requestCount % samplingStep == 0;
}
}
并发量估算:
场景描述:
核心代码位置:
| 文件 | 路径 | 作用 |
|---|---|---|
IotDeviceBatchController.java | api/ | 批量操作接口 |
IotDeviceBatchServiceImpl.java | service/impl/ | 批量服务实现 |
OtaServiceImpl.java | service/impl/ | OTA 升级服务 |
异步处理实现:
@RestController
@RequestMapping("/api/device/batch")
public class IotDeviceBatchController {
@Autowired
@Qualifier("device4hExecutor")
private ThreadPoolTaskExecutor executor;
@PostMapping("/createDeviceBatch")
public GepDeviceResponse<Boolean> createDeviceBatch(
@RequestBody CreateDeviceBatchRequest request) {
// CompletableFuture 异步处理,立即返回
CompletableFuture.runAsync(() -> {
GepDeviceResponse<Boolean> response =
iotDeviceBatchService.createDeviceBatch(request);
log.info("[device4h] createDeviceBatch async response={}",
JSON.toJSONString(response));
}, executor);
return GepDeviceResponseBuilder.buildSuccess(Boolean.TRUE);
}
@PostMapping("/importDevice")
public GepDeviceResponse<Boolean> importDevice(
@RequestParam("file") MultipartFile file) {
// 批量导入异步处理
CompletableFuture.runAsync(() -> {
iotDeviceBatchService.importDevice(file);
}, executor);
return GepDeviceResponseBuilder.buildSuccess(Boolean.TRUE);
}
}
批量插入优化:
@Service
public class IotDeviceBatchServiceImpl implements IotDeviceBatchService {
// 分页处理,每页 2000 条
private static final int BATCH_SIZE = 2000;
@Override
@Transactional(rollbackFor = Exception.class)
public GepDeviceResponse<Boolean> batchAddDevice(
List<IotDevice> deviceList) {
// 分批插入,避免单次事务过大
for (int i = 0; i < deviceList.size(); i += BATCH_SIZE) {
int end = Math.min(i + BATCH_SIZE, deviceList.size());
List<IotDevice> subList = deviceList.subList(i, end);
// 批量插入
iotDeviceMapper.batchInsert(subList);
}
return GepDeviceResponseBuilder.buildSuccess(true);
}
}
并发量估算:
场景描述:
核心代码位置:
| 文件 | 路径 | 作用 |
|---|---|---|
LindormConfig.java | config/ | Lindorm 配置 |
InfluxDBConfig.java | config/ | InfluxDB 配置 |
LindormTsdbService.java | infrastructure/lindorm/ | Lindorm 写入服务 |
InfluxDbWriteService.java | infrastructure/influxdb/ | InfluxDB 写入服务 |
Lindorm 批量写入配置:
@Configuration
public class LindormConfig {
@Bean
public TsdbClient lindormTsdbClient() {
ClientOptions options = ClientOptions.builder()
.connectTimeout(Duration.ofSeconds(30))
.writeOptions(WriteOptions.newBuilder()
.batchSize(100) // 批次大小 100
.maxPointBatches(32) // 最大批次数 32
.maxWaitTimeMs(1000) // 最大等待时间 1 秒
.build())
.connectionPoolOptions(ConnectionPoolOptions.builder()
.maxIdleConnections(20) // 最大空闲连接 20
.keepAliveDuration(5) // 保活时间 5 秒
.build())
.build();
return TsdbClientFactory.createClient(options);
}
}
异步批量写入实现:
@Service
public class LindormTsdbService {
@Autowired
private TsdbClient tsdbClient;
/**
* 异步批量写入
*/
public boolean batchWriteAsync(List<Record> records) {
try {
// 异步写入,不阻塞主线程
tsdbClient.writeAsync(records).whenComplete((result, throwable) -> {
if (throwable != null) {
log.error("Lindorm 异步写入失败", throwable);
} else {
log.debug("Lindorm 写入成功,记录数:{}", records.size());
}
});
return true;
} catch (Exception e) {
log.error("Lindorm 写入异常", e);
return false;
}
}
/**
* 同步批量写入(带重试)
*/
public boolean batchWriteSync(List<Record> records) {
int retryCount = 0;
int maxRetry = 3;
while (retryCount < maxRetry) {
try {
tsdbClient.write(records).join(); // 同步等待
return true;
} catch (Exception e) {
retryCount++;
log.warn("Lindorm 写入失败,第{}次重试", retryCount, e);
if (retryCount >= maxRetry) {
log.error("Lindorm 写入失败,已达最大重试次数", e);
return false;
}
try {
Thread.sleep(100 * retryCount); // 指数退避
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
return false;
}
}
InfluxDB 批量写入配置:
# application.yaml
influxdb:
enableBatch:
actions: 2000 # 2000 条数据点一批
flushDuration: 100 # 100ms 刷新一次
okhttpClient:
poolConnections: 20 # 连接池 20 个连接
poolKeepAliveDuration: 300000 # 保活时间 5 分钟
并发量估算:
场景描述:
核心代码位置:
| 配置文件 | 路径 | 用途 |
|---|---|---|
KafkaConsumerConfig.java | config/ | 通用消费者配置 |
KafkaHighConsumerConfig.java | config/ | 高频消费者配置 |
KafkaBatchConsumerConfig.java | config/ | 批量消费者配置 |
多消费者工厂配置:
@Configuration
public class KafkaConsumerConfig {
// 通用消费者工厂 - 2 个并发
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency); // 2
return factory;
}
}
@Configuration
public class KafkaHighConsumerConfig {
// 高频消费者工厂 - 3 个并发,每次拉取 500 条
@Bean("highKafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> highKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(highConsumerFactory());
factory.setConcurrency(highConcurrency); // 3
factory.getContainerProperties().setPollTimeout(pollTimeout); // 3000ms
return factory;
}
}
@Configuration
public class KafkaBatchConsumerConfig {
// 批量消费者工厂 - 2 个并发,每次拉取 500 条
@Bean("batchKafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> batchKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(batchConsumerFactory());
factory.setConcurrency(batchConcurrency); // 2
return factory;
}
}
消费者配置参数对比:
| 参数 | 通用消费者 | 高频消费者 | 批量消费者 |
|---|---|---|---|
| 并发度 | 2 | 3 | 2 |
| 每次拉取记录数 | 100 | 500 | 500 |
| 轮询超时时间 | 3000ms | 3000ms | 3000ms |
| 适用场景 | 普通业务消息 | 充电状态等高频消息 | 批量事件处理 |
并发量估算:
配置文件位置:ExecutorConfig.java
@Configuration
public class ExecutorConfig {
@Bean("device4hExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心配置
executor.setCorePoolSize(10); // 核心线程数
executor.setMaxPoolSize(20); // 最大线程数
executor.setQueueCapacity(30); // 缓冲队列容量
executor.setKeepAliveSeconds(10); // 空闲线程存活时间
executor.setThreadNamePrefix("Async-Thread-Device4h-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
使用场景统计:
| 使用位置 | 调用方式 | 并发量 |
|---|---|---|
IotDeviceBatchController.createDeviceBatch() | CompletableFuture.runAsync() | 100 批次/分钟 |
IotDeviceBatchController.importDevice() | CompletableFuture.runAsync() | 50 批次/分钟 |
CacheDeviceBaseInfoJob.process() | CompletableFuture.runAsync() | 1 次/5 分钟 |
CacheDeviceActiveStatusJob.process() | CompletableFuture.runAsync() | 1 次/5 分钟 |
线程池监控建议:
// 建议添加线程池监控
executor.setThreadPoolStatsListener(stats -> {
log.info("线程池状态 - 活跃线程:{}, 队列大小:{}, 已完成任务:{}",
stats.getActiveCount(),
stats.getQueueSize(),
stats.getCompletedTaskCount());
});
ConcurrentHashMap + AtomicInteger 组合:
@Service
public class SamplingServiceImpl implements SamplingService {
// Guava Cache 嵌套 ConcurrentHashMap + AtomicInteger
static Cache<String, ConcurrentHashMap<String, AtomicInteger>> samplingLocalCache =
CacheBuilder.newBuilder()
.initialCapacity(50000)
.maximumSize(500000)
.expireAfterAccess(4, TimeUnit.HOURS)
.build();
public boolean isSampled(String deviceSN, String sceneCode) {
// 获取或创建设备的采样场景 Map
ConcurrentHashMap<String, AtomicInteger> samplingSceneMap =
samplingLocalCache.get(deviceSN, () -> new ConcurrentHashMap<>());
// 原子计数器
AtomicInteger requestCountAtomic =
samplingSceneMap.computeIfAbsent(sceneCode, k -> new AtomicInteger(0));
// 原子自增,线程安全
int requestCount = requestCountAtomic.incrementAndGet();
// 溢出重置
if (requestCount > samplingTotalSize) {
requestCountAtomic.set(0);
return true;
}
// 采样判断
return requestCount % samplingStep == 0;
}
}
线程安全性分析:
ConcurrentHashMap:分段锁,支持高并发读写AtomicInteger:CAS 原子操作,无锁自增Cache.get(key, Callable):原子性获取或创建Caffeine 缓存(设备基础信息二级缓存):
@Service
public class DeviceLocalCacheServiceImpl implements DeviceLocalCacheService {
// Caffeine 缓存配置
private final Cache<String, DeviceBaseInfoDTO> deviceLocalCache =
Caffeine.newBuilder()
.initialCapacity(100000) // 初始容量 10 万
.maximumSize(500000) // 最大 50 万条
.expireAfter(new RandomExpiry(21600, 7200)) // 随机过期 6-8 小时
.recordStats() // 开启统计
.build();
@Override
public DeviceBaseInfoDTO getDeviceBaseInfo(String deviceSN) {
return deviceLocalCache.get(deviceSN, sn -> {
// Cache Miss 时从 Redis 或 DB 加载
return loadFromDatabase(sn);
});
}
@Override
public void updateDeviceBaseInfo(String deviceSN, DeviceBaseInfoDTO info) {
deviceLocalCache.put(deviceSN, info);
}
}
随机过期策略(防止缓存雪崩):
public class RandomExpiry implements Expiry<String, DeviceBaseInfoDTO> {
private final int baseExpireTime; // 基础过期时间 6 小时
private final int randomExpireTime; // 随机范围 2 小时
@Override
public long expireAfterCreate(DeviceBaseInfoDTO info, long currentTime) {
// 每个 key 独立的随机过期时间
return TimeUnit.SECONDS.toNanos(
baseExpireTime + ThreadLocalRandom.current().nextInt(randomExpireTime)
);
}
}
Guava Cache(OTA 任务缓存):
@Component
public class OtaTaskManageImpl {
// OTA 任务本地缓存 - 小容量短过期
static Cache<String, OtaTask> otaTaskLocalCache = CacheBuilder.newBuilder()
.initialCapacity(10)
.maximumSize(20)
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
public OtaTask getOtaTask(String taskId) {
return otaTaskLocalCache.getIfPresent(taskId);
}
}
缓存命中率统计:
// 通过 recordStats() 开启统计
CacheStats stats = deviceLocalCache.stats();
log.info("缓存命中率:{}", stats.hitRate()); // 目标 > 90%
log.info("Miss 次数:{}", stats.missCount());
log.info("Hit 次数:{}", stats.hitCount());
核心代码位置:DeviceCacheServiceImpl.java
缓存 Key 定义:
public interface DeviceCacheKeyConstant {
// 设备影子缓存
String DEVICE_SHADOW_KEY = "iot:device:shadow:";
// 设备基础信息缓存
String DEVICE_INFO_DEVICE_SN_KEY = "iot:device:info:sn:";
// 设备扩展信息缓存
String DEVICE_EXT_CACHE_TOC_KEY = "iot:device:ext:toc:";
// 设备激活状态缓存
String DEVICE_ACTIVE_STATUS_KEY = "iot:device:active:";
// OTA 进度缓存
String DEVICE_OTA_PROGRESS_KEY = "iot:device:ota:progress:";
}
缓存操作方法:
@Service
public class DeviceCacheServiceImpl implements DeviceCacheService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Override
public void saveDeviceShadow(String deviceId, String shadow) {
// 带过期时间的缓存(24 小时)
redisTemplate.opsForValue().set(
DEVICE_SHADOW_KEY + deviceId,
shadow,
24, TimeUnit.HOURS
);
}
@Override
public String getDeviceShadow(String deviceId) {
return redisTemplate.opsForValue().get(DEVICE_SHADOW_KEY + deviceId);
}
@Override
public void saveDeviceBaseInfo(String deviceSN, DeviceBaseInfoDTO info, long expireTime) {
// 随机过期时间,防止缓存雪崩
long randomExpire = expireTime + ThreadLocalRandom.current().nextInt(7200);
redisTemplate.opsForValue().set(
DEVICE_INFO_DEVICE_SN_KEY + deviceSN,
JSON.toJSONString(info),
randomExpire,
TimeUnit.SECONDS
);
}
}
并发访问控制:
@Override
public Boolean tryLockDevice(String deviceId, long expireSeconds) {
String lockKey = "iot:lock:device:" + deviceId;
String requestId = UUID.randomUUID().toString();
// SETNX 分布式锁
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(lockKey, requestId, expireSeconds, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
并发量估算:
配置文件位置:application.yaml
spring:
datasource:
type: com.alibaba.druid.pool.DruidDataSource
druid:
initial-size: 3 # 初始连接数
min-idle: 3 # 最小空闲连接
max-active: 20 # 最大活跃连接
max-wait: 10000 # 获取连接最大等待时间 (ms)
# 检测配置
time-between-eviction-runs-millis: 60000
min-evictable-idle-time-millis: 60000
max-evictable-idle-time-millis: 3000000
# 健康检查
validation-query: SELECT 'x'
test-while-idle: true
test-on-borrow: false
test-on-return: false
# 监控配置
pool-prepared-statements: false
max-pool-prepared-statement-per-connection-size: 100
# WallFilter 防火墙
filters: stat,wall,slf4j
connection-properties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
Druid 监控配置:
@Configuration
public class DruidConfig {
@Bean
public ServletRegistrationBean<StatViewServlet> druidStatViewServlet() {
StatViewServlet servlet = new StatViewServlet();
ServletRegistrationBean<StatViewServlet> registration =
new ServletRegistrationBean<>(servlet, "/druid/*");
// 监控页面登录账号密码
registration.addInitParameter("loginUsername", "admin");
registration.addInitParameter("loginPassword", "password");
// IP 白名单
registration.addInitParameter("allow", "127.0.0.1");
return registration;
}
@Bean
public FilterRegistrationBean<WebStatFilter> druidWebStatFilter() {
WebStatFilter filter = new WebStatFilter();
FilterRegistrationBean<WebStatFilter> registration =
new FilterRegistrationBean<>(filter);
registration.addUrlPatterns("/*");
registration.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");
return registration;
}
}
并发量估算:
核心代码位置:IdWorker.java
java
@Component
public class IdWorker {
// 起始时间戳(2024-01-01 00:00:00 UTC)
private static final long START_TIMESTAMP = 1704067200000L;
// 各部分位数
private static final long TIMESTAMP_BITS = 41; // 时间戳 41 位
private static final long MACHINE_BITS = 5; // 机器 ID 5 位
private static final long DATA_CENTER_BITS = 5; // 数据中心 5 位
private static final long SEQUENCE_BITS = 12; // 序列号 12 位
// 最大值
private static final long MAX_SEQUENCE = ~(-1L << SEQUENCE_BITS); // 4095
// 位移量
private static final long TIMESTAMP_SHIFT = SEQUENCE_BITS + MACHINE_BITS + DATA_CENTER_BITS;
private static final long MACHINE_SHIFT = SEQUENCE_BITS + DATA_CENTER_BITS;
private static final long DATA_CENTER_SHIFT = SEQUENCE_BITS;
private long sequence = 0L;
private long lastTimestamp = -1L;
private final long machineId;
private final long dataCenterId;
public IdWorker(@Value("${id.worker.machineId:1}") long machineId,
@Value("${id.worker.dataCenterId:1}") long dataCenterId) {
this.machineId = machineId;
this.dataCenterId = dataCenterId;
}
/**
* 生成下一个 ID(线程安全)
*/
public synchronized long nextId() {
long timestamp = System.currentTimeMillis();
// 时钟回拨处理
if (timestamp < lastTimestamp) {
throw new RuntimeException("时钟回拨异常");
}
if (timestamp == lastTimestamp) {
// 同一毫秒内,序列号自增
sequence = (sequence + 1) & MAX_SEQUENCE;
if (sequence == 0) {
// 序列号溢出,等待下一毫秒
timestamp = waitNextMillis(lastTimestamp);
}
} else {
// 不同毫秒,序列号重置
sequence = 0L;
}
lastTimestamp = timestamp;
// 组合 ID
return ((timestamp - START_TIMESTAMP) << TIMESTAMP_SHIFT)
| (dataCenterId << DATA_CENTER_SHIFT)
| (machineId << MACHINE_SHIFT)
| sequence;
}
private long waitNextMillis(long lastTimestamp) {
long timestamp = System.currentTimeMillis();
while (timestamp <= lastTimestamp) {
timestamp = System.currentTimeMillis();
}
return timestamp;
}
}
ID 结构:
┌──────┬─────────┬──────────┬─────────┐
│ 1bit │ 41bit │ 10bit │ 12bit │
│ sign │ timestamp│ machine │ sequence│
└──────┴─────────┴──────────┴─────────┘
性能指标:
synchronized 保证┌─────────────────┐
│ 客户端请求 │
└────────┬────────┘
│
▼
┌─────────────────┐
│ Caffeine 本地缓存│ ← L1: 6-8 小时随机过期,命中率~70%
└────────┬────────┘
│ Miss (30%)
▼
┌─────────────────┐
│ Redis 分布式缓存 │ ← L2: 24 小时过期,命中率~95%
└────────┬────────┘
│ Miss (5%)
▼
┌─────────────────┐
│ MySQL 数据库 │ ← 最终数据源
└─────────────────┘
缓存命中率计算:
采样配置:
public class SamplingConstant {
// 采样总数 100 万
public static final int SAMPLING_TOTAL_SIZE = 1000000;
// 默认采样步长 10(即 10:1 采样)
public static final int DEFAULT_SAMPLING_STEP = 10;
// 充电中状态业务处理采样场景
public static final String SCENE_CODE_CHARGE_STATUS_FOR_BIZ = "SCENE_CODE_CHARGE_STATUS_FOR_BIZ";
// 充电中状态 InfluxDB 写入采样场景
public static final String SCENE_CODE_CHARGE_STATUS_FOR_INFLUXDB = "SCENE_CODE_CHARGE_STATUS_FOR_INFLUXDB";
}
动态采样比例建议:
@Service
public class SamplingServiceImpl {
@Value("${sampling.step.default:10}")
private int defaultSamplingStep;
@Autowired
private RedisTemplate<String, Integer> redisTemplate;
public int getSamplingStep(String sceneCode) {
// 支持从 Redis 动态获取采样比例
String key = "iot:sampling:step:" + sceneCode;
Integer step = redisTemplate.opsForValue().get(key);
return step != null ? step : defaultSamplingStep;
}
}
| 操作类型 | 优化前 | 优化后 | 提升倍数 |
|---|---|---|---|
| 单条插入设备 | 1 条/SQL | 2000 条/SQL | 200x |
| 单条写入 InfluxDB | 1 条/请求 | 2000 条/批次 | 200x |
| 单条写入 Lindorm | 1 条/请求 | 100 条/批次 | 100x |
| 同步 OTA 下发 | 1 台/请求 | 批量异步 | 50x |
| 场景 | 输入并发 | 处理后并发 | 备注 |
|---|---|---|---|
| Kafka 消息接收 | 50,000 TPS | 50,000 TPS | 分区并行消费 |
| 采样后处理 | 50,000 TPS | 5,000 TPS | 10:1 采样 |
| Redis 缓存访问 | 100,000 QPS | 100,000 QPS | 多级缓存 |
| InfluxDB 写入 | 8,000 TPS | 8,000 TPS | 批量写入 |
| Lindorm 写入 | 10,000 TPS | 10,000 TPS | 异步批量 |
| MySQL 查询 | 5,000 QPS | 5,000 QPS | 连接池限制 20 |
| 批次创建设备 | 100,000 台/批次 | 异步后台处理 | 不阻塞接口 |
| 瓶颈点 | 当前配置 | 风险等级 | 建议优化方案 |
|---|---|---|---|
| 数据库连接池 | max-active=20 | 中 | 扩容至 50-100 |
| 线程池大小 | core=10, max=20 | 中 | 根据负载调整至 50-100 |
| 采样比例固定 | 固定 10:1 | 低 | 支持动态配置 |
| 缺少熔断降级 | 无 | 高 | 引入 Sentinel |
| 缺少分布式锁 | 无 | 高 | 引入 Redisson |
| Kafka 无重试 | 无重试机制 | 高 | 配置手动 Ack+ 重试 |
| 文件 | 路径 | 作用 | 并发相关配置 |
|---|---|---|---|
ExecutorConfig.java | config/ | 线程池配置 | core=10, max=20, queue=30 |
KafkaConsumerConfig.java | config/ | Kafka 通用消费者 | concurrency=2, poll=100 |
KafkaHighConsumerConfig.java | config/ | Kafka 高频消费者 | concurrency=3, poll=500 |
KafkaBatchConsumerConfig.java | config/ | Kafka 批量消费者 | concurrency=2, poll=500 |
LindormConfig.java | config/ | Lindorm 配置 | batchSize=100, maxPoint=32 |
InfluxDBConfig.java | config/ | InfluxDB 配置 | actions=2000, flush=100ms |
SamplingServiceImpl.java | service/impl/ | 采样服务 | samplingStep=10 |
DeviceLocalCacheServiceImpl.java | service/impl/ | Caffeine 本地缓存 | maxSize=500000 |
DeviceCacheServiceImpl.java | service/impl/ | Redis 缓存服务 | expire=24h |
IdWorker.java | utils/ | 分布式 ID 生成 | 26 万 ID/秒 |
IotDeviceBatchController.java | api/ | 批量操作接口 | 异步处理 |
OtaServiceImpl.java | service/impl/ | OTA 升级服务 | 批量推送 |