Kafka推送消息,实现异步处理


发送kafka消息,异步创建充电订单:

由别的消费者消费创建订单的消息,执行充电订单的创建逻辑。不影响启动充电的主流程,避免阻塞启动充电的命令下发。

	// 推送kafka消息,异步创建充电订单
	private boolean doCreateChargeOrder(CreateChargeOrderRequest createChargeOrderRequest) {
        String orderId = createChargeOrderRequest.getOrderId();
        try {
            kafkaProducer.asyncSendMsg(Topic.GEP_SUEZ_SERVICE_START_CHARGE_ORDER, orderId, JSON.toJSONString(createChargeOrderRequest));
            return true;
        } catch (Exception e) {
            log.error("createChargeOrder sendMessage exception,request={}", JSON.toJSONString(createChargeOrderRequest));
            return false;
        }
    }


    /**
     * producer发送消息,ack为 1
     * @param topic
     * @param key
     * @param message
     */
    public void asyncSendMsg(String topic, String key, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);
        asyncProcessResult(topic, key, message, future);
    }


	    /**
     * 统一的异步处理结果
     *
     * @param future
     */
    private void asyncProcessResult(String topic, String key, String message,
                                    ListenableFuture<SendResult<String, String>> future) {

        final String traceId = MDC.get(SysConstant.TRACE_ID);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                try {
                    MDC.put(SysConstant.TRACE_ID, traceId);
                    if (result.getRecordMetadata() != null) {
                        log.info("kafka msg send success, topic={}, key={}, message={}", topic, key, message);
                    } else {
                        log.error("kafka msg send error, topic={}, key={}, message={}", topic, key, message);
                    }
                } finally {
                    MDC.remove(SysConstant.TRACE_ID);
                }
            }

            @Override
            public void onFailure(Throwable ex) {
                try {
                    MDC.put(SysConstant.TRACE_ID, traceId);
                    log.error("kafka msg send error, topic={}, key={}, message={}", topic, key, message, ex);
                } finally {
                    MDC.remove(SysConstant.TRACE_ID);
                }
            }
        });
    }
	

image-20260421160233376

基于当前的异步 Kafka 消息创建订单的设计,我来分析可能出现的问题:

核心问题分析

1. 订单创建失败无法感知(最严重)

// 第 777-786 行
private boolean doCreateChargeOrder(CreateChargeOrderRequest createChargeOrderRequest) {
    String orderId = createChargeOrderRequest.getOrderId();
    try {
        kafkaProducer.asyncSendMsg(Topic.GEP_SUEZ_SERVICE_START_CHARGE_ORDER, 
                                   orderId, 
                                   JSON.toJSONString(createChargeOrderRequest));
        return true;  // ⚠️ 只表示消息发送成功,不代表订单创建成功
    } catch (Exception e) {
        log.error("createChargeOrder sendMessage exception", e);
        return false;  // ⚠️ 只能捕获发送异常,无法感知消费端失败
    }
}

问题场景:

  • ✅ Kafka 消息发送成功 → 返回 true
  • ❌ 消费者处理失败(数据库异常、ES 写入失败)→ 上游完全不知道
  • ❌ 消费者反序列化失败 → 订单静默丢失
  • ❌ 消费者业务逻辑异常 → 无告警、无回滚

后果:

用户看到:启充成功(orderId 已返回)
实际情况:订单未创建
影响:
  - 用户无法查询充电记录
  - 无法计费
  - 财务对账不一致
  - 客诉难以排查

2. 指令下发与订单创建的数据不一致

// 第 171-174 行
boolean createOrderResult = doCreateChargeOrder(createChargeOrderRequest);
if (createOrderResult) {
    // 只要 Kafka 发送成功就下发指令
    gepResponse = doStartCharge(createChargeOrderRequest);  // ⚠️ 也发 Kafka
    ...
}

时序问题:

T1: 发送订单创建消息到 Kafka
T2: 发送启充指令消息到 Kafka  
T3: 启充指令消费者先执行 → 设备开始充电
T4: 订单消费者后执行 → 订单才创建

问题:
  - 设备已开始充电,但订单还不存在
  - 如果 T4 失败,充电在进行但没有订单
  - 充电数据上报时找不到对应订单

3. 分布式事务缺失

当前涉及多个独立操作:

1. Redis 锁控制
2. Kafka 消息 1:创建订单
3. Kafka 消息 2:下发启充指令
4. Redis 缓存:设置启动中状态

没有保证原子性:

  • 订单创建成功,启充指令发送失败 → 有订单无充电
  • 启充指令成功,订单创建失败 → 有充电无订单 ⚠️
  • 两者都成功,但缓存设置失败 → 用户可以重复启充

4. 消息可靠性问题

kafkaProducer.asyncSendMsg(...)  // 异步发送

潜在风险:

  • 消息丢失:异步发送未配置 ACK,broker 接收失败也无法感知
  • 消息重复:网络重试可能导致重复消费
  • 消息乱序:虽然有相同的 key,但不能绝对保证顺序
  • 积压问题:消费者处理慢导致消息积压,订单延迟创建

5. 错误处理和补偿机制缺失

// 第 180-183 行
} else {
    return GepSuezResponseBuilder.buildFailed(
        H_CHARGE_CREATE_ORDER_ERROR.getCode(),
        H_CHARGE_CREATE_ORDER_ERROR.getName()
    );
}

问题:

  • 只有 Kafka 发送失败才会进入此分支
  • Kafka 发送成功但消费失败不会触发任何补偿
  • 没有重试机制
  • 没有死信队列处理
  • 没有人工介入入口

6. 幂等性问题

场景:

用户点击"启动充电"
  ↓
网络超时,用户再次点击
  ↓
生成两个不同的 orderId
  ↓
发送两条订单创建消息
  ↓
可能创建两个订单(如果消费者不做幂等)

当前防护:

  • ✅ 有 Redis 分布式锁(防止同一设备并发)
  • ✅ 有频率控制缓存(checkFrequency
  • ❌ 但锁的粒度是 providerNo + equipmentId,不同用户可能被阻塞

7. 可观测性问题

log.error("createChargeOrder sendMessage exception,request={}", ...);

监控盲区:

  • 只记录了发送异常
  • 无法追踪:消息是否被消费、消费结果如何
  • 无法关联:订单 ID 在整个链路的流转情况
  • 缺少指标:订单创建成功率、平均耗时、失败原因分布

改进建议

方案 1:同步创建订单(推荐短期方案)

private boolean doCreateChargeOrder(CreateChargeOrderRequest request) {
    try {
        // 直接调用订单服务,同步等待结果
        GepResponse<Boolean> response = gepOrderSuezClient.createChargeEsOrder(
            convertToEsOrderRequest(request)
        );
        
        if (response.failed()) {
            log.error("创建订单失败, orderId={}, error={}", 
                     request.getOrderId(), response.getMessage());
            return false;
        }
        
        return response.getData();
    } catch (Exception e) {
        log.error("创建订单异常, orderId={}", request.getOrderId(), e);
        return false;
    }
}

优点:

  • 立即知道订单创建结果
  • 失败可以及时返回用户
  • 避免后续流程继续执行

缺点:

  • 增加响应时间
  • 耦合订单服务

方案 2:异步 + 确认机制(推荐长期方案)

// 1. 发送消息时携带回调 ID
String callbackId = idService.defaultIdWithSnowFlake();
createChargeOrderRequest.setCallbackId(callbackId);

// 2. 将请求暂存到 Redis(带过期时间)
redisCacheService.putValue(
    "charge_order_pending:" + callbackId,
    createChargeOrderRequest,
    30, TimeUnit.SECONDS
);

// 3. 发送 Kafka 消息
kafkaProducer.asyncSendMsg(...);

// 4. 监听订单创建结果 Topic
@KafkaListener(topics = "ORDER_CREATE_RESULT")
public void handleOrderCreateResult(OrderCreateResult result) {
    if (result.isSuccess()) {
        // 继续下发启充指令
        doStartCharge(result.getRequest());
    } else {
        // 清理缓存,记录失败日志
        redisCacheService.deleteValue("charge_order_pending:" + result.getCallbackId());
        alertService.sendAlert("订单创建失败: " + result.getOrderId());
    }
}

方案 3:本地消息表 + 定时补偿

@Transactional
public boolean doCreateChargeOrder(CreateChargeOrderRequest request) {
    // 1. 本地事务:保存消息到本地表
    LocalMessage message = new LocalMessage();
    message.setMsgId(request.getOrderId());
    message.setTopic(Topic.GEP_SUEZ_SERVICE_START_CHARGE_ORDER);
    message.setContent(JSON.toJSONString(request));
    message.setStatus("PENDING");
    localMessageMapper.insert(message);
    
    // 2. 发送 Kafka 消息
    try {
        kafkaProducer.asyncSendMsg(...);
        message.setStatus("SENT");
    } catch (Exception e) {
        message.setStatus("FAILED");
        throw e;
    }
    localMessageMapper.update(message);
    
    return true;
}

// 3. 定时任务扫描未确认的消息
@Scheduled(fixedDelay = 5000)
public void compensatePendingMessages() {
    List<LocalMessage> pendingList = localMessageMapper.selectPending();
    pendingList.forEach(msg -> {
        // 重新发送或标记为失败
    });
}

方案 4:增强监控和告警

// 添加链路追踪
MDC.put("traceId", MDCUtil.generateTraceId());
MDC.put("orderId", createChargeOrderRequest.getOrderId());

try {
    kafkaProducer.asyncSendMsg(...);
    
    // 记录指标
    metricService.increment("charge.order.create.sent");
    
} catch (Exception e) {
    metricService.increment("charge.order.create.failed");
    alertService.sendAlert("订单消息发送失败", e);
    throw e;
} finally {
    MDC.clear();
}

// 消费者端也要埋点
@KafkaListener(...)
public void consume(String message) {
    long startTime = System.currentTimeMillis();
    try {
        // 处理逻辑
        metricService.increment("charge.order.create.consumed.success");
    } catch (Exception e) {
        metricService.increment("charge.order.create.consumed.failed");
        alertService.sendAlert("订单消费失败", e);
        throw e;
    } finally {
        long cost = System.currentTimeMillis() - startTime;
        metricService.recordTime("charge.order.create.cost", cost);
    }
}

问题优先级排序

优先级问题影响建议方案
P0订单创建失败无法感知财务损失、客诉方案 1(同步调用)
P0订单与指令不一致数据混乱方案 2(确认机制)
P1缺少补偿机制故障恢复困难方案 3(本地消息表)
P1可观测性不足问题定位困难方案 4(监控告警)
P2幂等性不完善可能重复订单加强消费者幂等校验
P2消息可靠性可能丢失配置 ACK + 重试

最佳实践建议

短期内(1-2周):

  1. 改为同步创建订单(方案 1)
  2. 增加完整的日志和监控(方案 4)
  3. 消费者增加幂等校验

中长期(1-2月):

  1. 实现异步确认机制(方案 2)
  2. 引入本地消息表做补偿(方案 3)
  3. 建立完善的告警体系

架构层面:

  • 考虑使用支持事务消息的 MQ(如 RocketMQ)
  • 引入 Saga 或 TCC 分布式事务框架
  • 建立统一的消息追踪平台

这样的设计才能保证在高性能的同时,不牺牲数据一致性和可靠性。

SpringBoot
Kafka