混用 简单工厂模式 + 策略模式,对接收的kafka消息进行不同的业务处理


在处理kafka接收上游数据时,采用 策略模式**+****工厂方法模式,在**工厂中根据表名动态创建实现类,在子类中实现具体策略。在统一处理消息时,根据不同的业务进行逻辑处理,对代码完成解耦设计实现。

混用 简单工厂模式 + 策略模式,对接收的kafka消息进行不同的业务处理

工厂代码

工厂类中定义具体要创建哪个子类-策略

import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@Component
public class UserDataHandleFactory implements InitializingBean {
    @Resource
    private List<UserMsgDataHandle> handleList;

    private final Map<String, UserMsgDataHandle> handleMap = Maps.newHashMap();

    public static final String NO_HANDLE_ERROR_MESSAGE = "user-system的数据处理者为空";

    @Override
    public void afterPropertiesSet() {
        handleList.forEach(item -> {
            UserDataAnnotation userDataAnnotation = AnnotationUtils.findAnnotation(item.getClass(), UserDataAnnotation.class);
            if (Objects.isNull(userDataAnnotation) && StringUtils.isNotEmpty(userDataAnnotation.tableName())) {
                handleMap.put(userDataAnnotation.tableName(), item);
            }
        });
    }

    public UserMsgDataHandle create(String name) {
        UserMsgDataHandle handler = handleMap.get(name);
        if (handler == null) {
            throw new RuntimeException(NO_HANDLE_ERROR_MESSAGE);
        } else {
            return handler;
        }
    }
}

工厂类实现逻辑介绍

我们有一个工厂类UserDataHandleFactory,它实现了InitializingBean接口,用于在Spring容器初始化时进行一些操作。

这个工厂的主要作用是根据表名(tableName)获取对应的消息处理器(UserMsgDataHandle)。

代码解析:

  1. 类注解:@Component,表明这是一个Spring组件,会被Spring容器管理。

  2. 字段:

- handleList:通过@Resource注入一个UserMsgDataHandle类型的List。Spring会自动将所有实现UserMsgDataHandle接口的Bean注入到这个List中。

- handleMap:一个HashMap,用于存储表名与对应处理器的映射关系。

- NO_HANDLE_ERROR_MESSAGE:常量,表示没有找到处理器时的错误信息。

  1. afterPropertiesSet方法:实现了InitializingBean接口的方法,在Bean的属性设置完成后被Spring容器调用。

在这个方法中,遍历handleList,对每个处理器(item)做如下操作:

- 通过AnnotationUtils.findAnnotation方法,查找该处理器类上的注解@UserDataAnnotation。

- 判断:如果注解不存在(Objects.isNull(userDataAnnotation))或者注解存在但表名(userDataAnnotation.tableName())为空,则跳过(注意:这里条件写的是“与”,即两个条件都满足才跳过?实际上,这里逻辑应该是:如果注解为空,或者注解不为空但表名为空,则跳过。但代码中条件写的是:注解为空 且 表名不为空,这个条件永远不可能为真,因为如果注解为空,那么userDataAnnotation就是null,调用userDataAnnotation.tableName()会报空指针异常。所以这里代码有误!)

  1. create方法:根据传入的表名(name)从handleMap中获取对应的处理器。如果找不到,则抛出异常;否则返回处理器。

问题:

在afterPropertiesSet方法中,条件判断有逻辑错误,并且存在潜在的空指针异常。

修正:

我们想要的是:如果注解不为空,并且注解中的表名不为空,则将该处理器注册到handleMap中。

因此,代码应该修改为:

handleList.forEach(item -> {

UserDataAnnotation userDataAnnotation = AnnotationUtils.findAnnotation(item.getClass(), UserDataAnnotation.class);

if (userDataAnnotation != null && StringUtils.isNotEmpty(userDataAnnotation.tableName())) {

handleMap.put(userDataAnnotation.tableName(), item);

}

});

这样,只有带有@UserDataAnnotation注解且tableName不为空的处理器才会被注册。

另外,注意:如果两个处理器注解了相同的tableName,那么后处理的会覆盖先处理的。所以需要确保tableName唯一。

设计模式:

这里使用了工厂模式(Factory Pattern),根据表名创建对应的处理器(实际上是获取已创建的Bean)。

同时,利用了Spring的依赖注入和初始化回调机制。

总结:

UserDataHandleFactory在Spring初始化完成后,会收集所有带有@UserDataAnnotation注解且指定了表名的处理器,并建立表名到处理器的映射。

然后,通过create方法,根据表名获取对应的处理器。

工厂中实现创建的处理器-父类

父类中定义 通用的抽象方法 handleMag

public abstract class UserMsgDataHandle<T> {
    protected static final Integer BATCH_INSERT_COUNT = 1000;

    protected abstract List<T> handleMag(String payload);
}

具体的 处理器实现子类

对父类方法 handleMag 进行实现

@UserDataAnnotation(tableName = "student")
public class StudentMsgDataHandle extends UserMsgDataHandle<StudentEntity>{

    @Override
    protected List<StudentEntity> handleMag(String payload) {
        Log.info("Student 表,kafka消息头:【student】接受消息成功,消息:{}", payload);
        return null;
    }
}

自定义的注解类:在处理器实现子类中使用,给每个具体子类定义标识。

方便在工厂类中,通过反射获取子类标识,进而根据标识创建具体的子类处理器对象。

import org.springframework.stereotype.Component;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface UserDataAnnotation {
    String tableName() default "";

    String fileName() default "";
}

消费者代码

1.根据kafka约定的topic,接收到消息 message

2.获取 header 里面的信息:Table-Name

3.根据 Table-Name 创建不同的子类处理器对象 userMsgDataHandle

4.根据创建的对象userMsgDataHandle,调用 子类中的实现方法handleMag 处理 message。

5.子类处理器对象中实现 handleMag 方法,处理具体业务逻辑

@Slf4j
@Component
public class UserConsumer {

    /**
     * 订阅的kafka消息头中的key
     * 比如订阅了来自 A 方的 topic: A_COMMON
     * topic 中有一对键值对 Table-Name:TEST_TABLE
     */
    private static final String USER_MSG_HEADER_KEY = "Table-Name";

    @Resource
    protected UserDataHandleFactory userDataHandleFactory;

    @StreamListener(target = MsgSink.userMsgInput)
    public void handleUserMsg(Message<String> message, @Header Map<String, Object> header) {
        log.info("kafka-start handleMsg { header: {} }", header);
        try {
            // 处理消息
            String payload = message.getPayload();
            // 根据消息 header 里的key Table-Name,获取推送的数据类型 比如推送的数据表示哪个表的数据
            String tableName = new String((byte[]) header.get(USER_MSG_HEADER_KEY));
            log.info("topic_partitionId_offset:{}_{}_{}, table:{}, message: {}", header.get(KafkaHeaders.RECEIVED_TOPIC),
                    header.get(KafkaHeaders.RECEIVED_PARTITION_ID), header.get(KafkaHeaders.OFFSET), tableName, payload);
            UserMsgDataHandle userMsgDataHandle = null;
            try {
                userMsgDataHandle = userDataHandleFactory.create(tableName);
                userMsgDataHandle.handleMag(payload);
            } catch (Exception e) {
                // 这里 空处理器异常放行 不处理
                if (!StringUtils.contains(e.getMessage(), userDataHandleFactory.NO_HANDLE_ERROR_MESSAGE)) {
                    throw e;
                }
            }
            Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
            log.info("acknowledgment: {}", acknowledgment);
            if (Objects.nonNull(acknowledgment)) {
                // kafka手动确认处理
                acknowledgment.acknowledge();
            }

        } catch (Exception e) {
            log.error("USER_SYSTEM-用户中心异常:" + "consumer 本应用 from User-System: mes error ", e);
        } finally {
            MDC.remove("apiCode");
            MDC.remove("apiVer");
            MDC.remove("deploymentUnitNo");
            MDC.remove("serviceName");
            MDC.remove("serviceVer");
            MDC.remove("globalSerNo");
            MDC.remove("txnSerNo");
            MDC.remove("parentTxnSerNo");
        }


    }

}

​ 理解 @StreamListener(target = MsgSink.userMsgInput) 的使用,结合 Spring Cloud Stream 的核心机制来解析。

​ 这段代码本质上是通过 Spring Cloud Stream 框架实现对 Kafka 消息的消费,而 @StreamListener 是该框架中用于绑定消息监听逻辑的核心注解。

1.背景:Spring Cloud Stream 简介

​ Spring Cloud Stream 是一个用于构建消息驱动微服务的框架,它通过 抽象绑定器(Binder) 屏蔽了不同消息中间件(如 Kafka、RabbitMQ)的底层差异,让开发者可以用统一的 API 处理消息收发,无需关心具体中间件的细节。

​ 其核心概念包括:

  • 通道(Channel):分为输入通道(Input)和输出通道(Output),用于定义消息的流入 / 流出端点。
  • 绑定器(Binder):连接应用与消息中间件的桥梁(如 Kafka Binder、RabbitMQ Binder),负责通道与实际消息队列(如 Kafka Topic)的绑定。
  • 消息监听:通过注解将方法与输入通道关联,当通道有消息时自动触发方法执行。

2. @StreamListener 注解的作用

@StreamListener 是 Spring Cloud Stream 中用于标记 消息消费方法 的注解,其核心功能是: 将当前方法注册为指定输入通道的消息处理器,当该通道接收到消息时,自动调用该方法处理消息

​ 在代码中,@StreamListener(target = MsgSink.userMsgInput) 表示: ​ 当前 handleUserMsg 方法会监听 MsgSink.userMsgInput 这个输入通道,当该通道有消息到达时,方法会被自动触发。

  1. target = MsgSink.userMsgInput 的具体含义

(1)MsgSink:通道定义接口

MsgSink 是一个 输入通道定义接口,通常由开发者定义,用于声明应用需要监听的输入通道。其内部通过 @Input 注解标记输入通道的名称,例如:

public interface MsgSink {
    // 定义输入通道名称为 "userMsgInput"
    String userMsgInput = "userMsgInput";

    @Input(userMsgInput)  // 标记为输入通道
    SubscribableChannel userMsgInput();
}

这里的 userMsgInput 是通道的逻辑名称,后续通过配置可将其绑定到具体的 Kafka Topic。

(2)绑定通道与 Kafka Topic

MsgSink.userMsgInput 这个逻辑通道需要通过配置文件(如 application.yml)与实际的 Kafka Topic 绑定,例如:

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: 127.0.0.1:9092  # Kafka 服务地址
      bindings:
        userMsgInput:  # 与 MsgSink.userMsgInput 对应
          destination: USER_TOPIC  # 绑定到 Kafka 的 USER_TOPIC 主题
          group: user-consumer-group  # 消费者组
          consumer:
            auto-offset-reset: earliest  # 偏移量重置策略
            ack-mode: manual  # 手动确认消息(与代码中 acknowledgment.acknowledge() 对应)

通过上述配置,MsgSink.userMsgInput 通道就与 Kafka 的 USER_TOPIC 主题关联起来了。当 USER_TOPIC 有新消息时,消息会被投递到 userMsgInput 通道,进而触发 handleUserMsg 方法。

4. 方法参数与消息处理逻辑

handleUserMsg 方法的参数和逻辑进一步体现了消息消费的细节:

  • Message<String> message:Spring Cloud Stream 会自动将消息包装为 Message 对象,其中 message.getPayload() 可获取消息体( payload ),message.getHeaders() 可获取消息头。
  • @Header Map<String, Object> header:用于直接获取消息头信息(如 Kafka 主题、分区、偏移量等元数据)。
  • 逻辑流程:从消息头中获取 Table-Name 确定数据类型 → 通过 userDataHandleFactory 获取对应的处理器 → 处理消息 → 手动确认消息(acknowledgment.acknowledge()),确保消息被正确消费后再提交偏移量。

总结

@StreamListener(target = MsgSink.userMsgInput) 的核心作用是: handleUserMsg 方法与 MsgSink.userMsgInput 输入通道绑定,当该通道(已通过配置关联到具体 Kafka Topic)接收到消息时,自动触发方法执行消息处理逻辑

这种方式的优势在于:

  1. 解耦:代码无需直接依赖 Kafka API,通过 Spring Cloud Stream 抽象,可轻松切换到其他消息中间件(如 RabbitMQ)。
  2. 简化配置:通过通道绑定配置,无需硬编码 Topic 名称,便于维护。
  3. 标准化:统一消息处理接口,降低多中间件集成的复杂度。

代码分析-文字描述版

主要使用了两种设计模式:

1. 策略模式 (Strategy Pattern)

  • 体现点

    UserMsgDataHandle userMsgDataHandle = userDataHandleFactory.create(tableName);
    userMsgDataHandle.handleMag(payload);
    
  • 分析

    • 通过 UserDataHandleFactory 工厂根据 tableName 动态创建不同的 UserMsgDataHandle 实现类。
    • UserMsgDataHandle 接口定义了统一的消息处理方法 handleMag(),不同表名对应不同的具体策略实现。
    • 运行时根据表名动态切换处理策略(如 TEST_TABLE 可能对应不同的处理逻辑)。
  • 符合策略模式特征

    • 分离变化点(不同表的处理逻辑)与稳定点(消息消费流程)。
    • 通过接口抽象策略行为,支持运行时替换策略。

2. 工厂方法模式 (Factory Method Pattern)

  • 体现点

    userDataHandleFactory.create(tableName);
    
  • 分析

    • UserDataHandleFactory 作为工厂,封装了具体策略对象的创建逻辑。
    • 根据输入参数 tableName 创建对应的 UserMsgDataHandle 实例。
    • 工厂隔离了策略对象的创建细节,客户端只需关注接口调用。
  • 异常处理佐证

    if (!StringUtils.contains(e.getMessage(), userDataHandleFactory.NO_HANDLE_ERROR_MESSAGE)) {
                        throw e;
                    }
    

    表明工厂在找不到匹配策略时会抛出特定异常(如 "No handler for table: XXX")。


设计模式协作关系

graph LR
    Client[UserConsumer] --> Factory[UserDataHandleFactory]
    Client --> Strategy[UserMsgDataHandle]
    Factory --> StrategyA[ConcreteStrategyA]
    Factory --> StrategyB[ConcreteStrategyB]
    StrategyA -->|implements| Strategy
    StrategyB -->|implements| Strategy

  1. 工厂方法模式 负责创建具体策略对象。
  2. 策略模式 负责执行具体算法逻辑。
  3. 两者结合实现 "开闭原则"
    • 新增表处理只需添加新策略类 + 扩展工厂映射
    • 无需修改 UserConsumer 主流程

其他设计要点

  1. 松耦合设计
    • 消费逻辑与具体处理策略解耦
    • 符合 "面向接口编程" 原则
  2. 防御性编程
    • 空策略异常处理(NO_HANDLE_ERROR_MESSAGE
    • Kafka ACK 的空指针检查
  3. 可扩展性
    • 新增表类型只需实现 UserMsgDataHandle
    • 工厂添加新映射关系

总结

该代码核心采用 策略模式+工厂方法模式 的组合,实现了消息处理的动态路由和扩展性,是典型的解耦设计实践。

JAVA-技能点
中间件
Kafka
设计模式