由别的消费者消费创建订单的消息,执行充电订单的创建逻辑。不影响启动充电的主流程,避免阻塞启动充电的命令下发。
// 推送kafka消息,异步创建充电订单
private boolean doCreateChargeOrder(CreateChargeOrderRequest createChargeOrderRequest) {
String orderId = createChargeOrderRequest.getOrderId();
try {
kafkaProducer.asyncSendMsg(Topic.GEP_SUEZ_SERVICE_START_CHARGE_ORDER, orderId, JSON.toJSONString(createChargeOrderRequest));
return true;
} catch (Exception e) {
log.error("createChargeOrder sendMessage exception,request={}", JSON.toJSONString(createChargeOrderRequest));
return false;
}
}
/**
* producer发送消息,ack为 1
* @param topic
* @param key
* @param message
*/
public void asyncSendMsg(String topic, String key, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);
asyncProcessResult(topic, key, message, future);
}
/**
* 统一的异步处理结果
*
* @param future
*/
private void asyncProcessResult(String topic, String key, String message,
ListenableFuture<SendResult<String, String>> future) {
final String traceId = MDC.get(SysConstant.TRACE_ID);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
try {
MDC.put(SysConstant.TRACE_ID, traceId);
if (result.getRecordMetadata() != null) {
log.info("kafka msg send success, topic={}, key={}, message={}", topic, key, message);
} else {
log.error("kafka msg send error, topic={}, key={}, message={}", topic, key, message);
}
} finally {
MDC.remove(SysConstant.TRACE_ID);
}
}
@Override
public void onFailure(Throwable ex) {
try {
MDC.put(SysConstant.TRACE_ID, traceId);
log.error("kafka msg send error, topic={}, key={}, message={}", topic, key, message, ex);
} finally {
MDC.remove(SysConstant.TRACE_ID);
}
}
});
}

基于当前的异步 Kafka 消息创建订单的设计,我来分析可能出现的问题:
// 第 777-786 行
private boolean doCreateChargeOrder(CreateChargeOrderRequest createChargeOrderRequest) {
String orderId = createChargeOrderRequest.getOrderId();
try {
kafkaProducer.asyncSendMsg(Topic.GEP_SUEZ_SERVICE_START_CHARGE_ORDER,
orderId,
JSON.toJSONString(createChargeOrderRequest));
return true; // ⚠️ 只表示消息发送成功,不代表订单创建成功
} catch (Exception e) {
log.error("createChargeOrder sendMessage exception", e);
return false; // ⚠️ 只能捕获发送异常,无法感知消费端失败
}
}
问题场景:
true后果:
用户看到:启充成功(orderId 已返回)
实际情况:订单未创建
影响:
- 用户无法查询充电记录
- 无法计费
- 财务对账不一致
- 客诉难以排查
// 第 171-174 行
boolean createOrderResult = doCreateChargeOrder(createChargeOrderRequest);
if (createOrderResult) {
// 只要 Kafka 发送成功就下发指令
gepResponse = doStartCharge(createChargeOrderRequest); // ⚠️ 也发 Kafka
...
}
时序问题:
T1: 发送订单创建消息到 Kafka
T2: 发送启充指令消息到 Kafka
T3: 启充指令消费者先执行 → 设备开始充电
T4: 订单消费者后执行 → 订单才创建
问题:
- 设备已开始充电,但订单还不存在
- 如果 T4 失败,充电在进行但没有订单
- 充电数据上报时找不到对应订单
当前涉及多个独立操作:
1. Redis 锁控制
2. Kafka 消息 1:创建订单
3. Kafka 消息 2:下发启充指令
4. Redis 缓存:设置启动中状态
没有保证原子性:
kafkaProducer.asyncSendMsg(...) // 异步发送
潜在风险:
// 第 180-183 行
} else {
return GepSuezResponseBuilder.buildFailed(
H_CHARGE_CREATE_ORDER_ERROR.getCode(),
H_CHARGE_CREATE_ORDER_ERROR.getName()
);
}
问题:
场景:
用户点击"启动充电"
↓
网络超时,用户再次点击
↓
生成两个不同的 orderId
↓
发送两条订单创建消息
↓
可能创建两个订单(如果消费者不做幂等)
当前防护:
checkFrequency)providerNo + equipmentId,不同用户可能被阻塞log.error("createChargeOrder sendMessage exception,request={}", ...);
监控盲区:
private boolean doCreateChargeOrder(CreateChargeOrderRequest request) {
try {
// 直接调用订单服务,同步等待结果
GepResponse<Boolean> response = gepOrderSuezClient.createChargeEsOrder(
convertToEsOrderRequest(request)
);
if (response.failed()) {
log.error("创建订单失败, orderId={}, error={}",
request.getOrderId(), response.getMessage());
return false;
}
return response.getData();
} catch (Exception e) {
log.error("创建订单异常, orderId={}", request.getOrderId(), e);
return false;
}
}
优点:
缺点:
// 1. 发送消息时携带回调 ID
String callbackId = idService.defaultIdWithSnowFlake();
createChargeOrderRequest.setCallbackId(callbackId);
// 2. 将请求暂存到 Redis(带过期时间)
redisCacheService.putValue(
"charge_order_pending:" + callbackId,
createChargeOrderRequest,
30, TimeUnit.SECONDS
);
// 3. 发送 Kafka 消息
kafkaProducer.asyncSendMsg(...);
// 4. 监听订单创建结果 Topic
@KafkaListener(topics = "ORDER_CREATE_RESULT")
public void handleOrderCreateResult(OrderCreateResult result) {
if (result.isSuccess()) {
// 继续下发启充指令
doStartCharge(result.getRequest());
} else {
// 清理缓存,记录失败日志
redisCacheService.deleteValue("charge_order_pending:" + result.getCallbackId());
alertService.sendAlert("订单创建失败: " + result.getOrderId());
}
}
@Transactional
public boolean doCreateChargeOrder(CreateChargeOrderRequest request) {
// 1. 本地事务:保存消息到本地表
LocalMessage message = new LocalMessage();
message.setMsgId(request.getOrderId());
message.setTopic(Topic.GEP_SUEZ_SERVICE_START_CHARGE_ORDER);
message.setContent(JSON.toJSONString(request));
message.setStatus("PENDING");
localMessageMapper.insert(message);
// 2. 发送 Kafka 消息
try {
kafkaProducer.asyncSendMsg(...);
message.setStatus("SENT");
} catch (Exception e) {
message.setStatus("FAILED");
throw e;
}
localMessageMapper.update(message);
return true;
}
// 3. 定时任务扫描未确认的消息
@Scheduled(fixedDelay = 5000)
public void compensatePendingMessages() {
List<LocalMessage> pendingList = localMessageMapper.selectPending();
pendingList.forEach(msg -> {
// 重新发送或标记为失败
});
}
// 添加链路追踪
MDC.put("traceId", MDCUtil.generateTraceId());
MDC.put("orderId", createChargeOrderRequest.getOrderId());
try {
kafkaProducer.asyncSendMsg(...);
// 记录指标
metricService.increment("charge.order.create.sent");
} catch (Exception e) {
metricService.increment("charge.order.create.failed");
alertService.sendAlert("订单消息发送失败", e);
throw e;
} finally {
MDC.clear();
}
// 消费者端也要埋点
@KafkaListener(...)
public void consume(String message) {
long startTime = System.currentTimeMillis();
try {
// 处理逻辑
metricService.increment("charge.order.create.consumed.success");
} catch (Exception e) {
metricService.increment("charge.order.create.consumed.failed");
alertService.sendAlert("订单消费失败", e);
throw e;
} finally {
long cost = System.currentTimeMillis() - startTime;
metricService.recordTime("charge.order.create.cost", cost);
}
}
| 优先级 | 问题 | 影响 | 建议方案 |
|---|---|---|---|
| P0 | 订单创建失败无法感知 | 财务损失、客诉 | 方案 1(同步调用) |
| P0 | 订单与指令不一致 | 数据混乱 | 方案 2(确认机制) |
| P1 | 缺少补偿机制 | 故障恢复困难 | 方案 3(本地消息表) |
| P1 | 可观测性不足 | 问题定位困难 | 方案 4(监控告警) |
| P2 | 幂等性不完善 | 可能重复订单 | 加强消费者幂等校验 |
| P2 | 消息可靠性 | 可能丢失 | 配置 ACK + 重试 |
短期内(1-2周):
中长期(1-2月):
架构层面:
这样的设计才能保证在高性能的同时,不牺牲数据一致性和可靠性。