在处理kafka接收上游数据时,采用 策略模式**+****工厂方法模式,在**工厂中根据表名动态创建实现类,在子类中实现具体策略。在统一处理消息时,根据不同的业务进行逻辑处理,对代码完成解耦设计实现。
混用 简单工厂模式 + 策略模式,对接收的kafka消息进行不同的业务处理
工厂代码
工厂类中定义具体要创建哪个子类-策略
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@Component
public class UserDataHandleFactory implements InitializingBean {
@Resource
private List<UserMsgDataHandle> handleList;
private final Map<String, UserMsgDataHandle> handleMap = Maps.newHashMap();
public static final String NO_HANDLE_ERROR_MESSAGE = "user-system的数据处理者为空";
@Override
public void afterPropertiesSet() {
handleList.forEach(item -> {
UserDataAnnotation userDataAnnotation = AnnotationUtils.findAnnotation(item.getClass(), UserDataAnnotation.class);
if (Objects.isNull(userDataAnnotation) && StringUtils.isNotEmpty(userDataAnnotation.tableName())) {
handleMap.put(userDataAnnotation.tableName(), item);
}
});
}
public UserMsgDataHandle create(String name) {
UserMsgDataHandle handler = handleMap.get(name);
if (handler == null) {
throw new RuntimeException(NO_HANDLE_ERROR_MESSAGE);
} else {
return handler;
}
}
}
我们有一个工厂类UserDataHandleFactory,它实现了InitializingBean接口,用于在Spring容器初始化时进行一些操作。
这个工厂的主要作用是根据表名(tableName)获取对应的消息处理器(UserMsgDataHandle)。
代码解析:
类注解:@Component,表明这是一个Spring组件,会被Spring容器管理。
字段:
- handleList:通过@Resource注入一个UserMsgDataHandle类型的List。Spring会自动将所有实现UserMsgDataHandle接口的Bean注入到这个List中。
- handleMap:一个HashMap,用于存储表名与对应处理器的映射关系。
- NO_HANDLE_ERROR_MESSAGE:常量,表示没有找到处理器时的错误信息。
- afterPropertiesSet方法:实现了InitializingBean接口的方法,在Bean的属性设置完成后被Spring容器调用。
在这个方法中,遍历handleList,对每个处理器(item)做如下操作:
- 通过AnnotationUtils.findAnnotation方法,查找该处理器类上的注解@UserDataAnnotation。
- 判断:如果注解不存在(Objects.isNull(userDataAnnotation))或者注解存在但表名(userDataAnnotation.tableName())为空,则跳过(注意:这里条件写的是“与”,即两个条件都满足才跳过?实际上,这里逻辑应该是:如果注解为空,或者注解不为空但表名为空,则跳过。但代码中条件写的是:注解为空 且 表名不为空,这个条件永远不可能为真,因为如果注解为空,那么userDataAnnotation就是null,调用userDataAnnotation.tableName()会报空指针异常。所以这里代码有误!)
- create方法:根据传入的表名(name)从handleMap中获取对应的处理器。如果找不到,则抛出异常;否则返回处理器。
问题:
在afterPropertiesSet方法中,条件判断有逻辑错误,并且存在潜在的空指针异常。
修正:
我们想要的是:如果注解不为空,并且注解中的表名不为空,则将该处理器注册到handleMap中。
因此,代码应该修改为:
handleList.forEach(item -> {
UserDataAnnotation userDataAnnotation = AnnotationUtils.findAnnotation(item.getClass(), UserDataAnnotation.class);
if (userDataAnnotation != null && StringUtils.isNotEmpty(userDataAnnotation.tableName())) {
handleMap.put(userDataAnnotation.tableName(), item);
}
});
这样,只有带有@UserDataAnnotation注解且tableName不为空的处理器才会被注册。
另外,注意:如果两个处理器注解了相同的tableName,那么后处理的会覆盖先处理的。所以需要确保tableName唯一。
设计模式:
这里使用了工厂模式(Factory Pattern),根据表名创建对应的处理器(实际上是获取已创建的Bean)。
同时,利用了Spring的依赖注入和初始化回调机制。
总结:
UserDataHandleFactory在Spring初始化完成后,会收集所有带有@UserDataAnnotation注解且指定了表名的处理器,并建立表名到处理器的映射。
然后,通过create方法,根据表名获取对应的处理器。
工厂中实现创建的处理器-父类
父类中定义 通用的抽象方法 handleMag
public abstract class UserMsgDataHandle<T> {
protected static final Integer BATCH_INSERT_COUNT = 1000;
protected abstract List<T> handleMag(String payload);
}
具体的 处理器实现子类
对父类方法 handleMag 进行实现
@UserDataAnnotation(tableName = "student")
public class StudentMsgDataHandle extends UserMsgDataHandle<StudentEntity>{
@Override
protected List<StudentEntity> handleMag(String payload) {
Log.info("Student 表,kafka消息头:【student】接受消息成功,消息:{}", payload);
return null;
}
}
自定义的注解类:在处理器实现子类中使用,给每个具体子类定义标识。
方便在工厂类中,通过反射获取子类标识,进而根据标识创建具体的子类处理器对象。
import org.springframework.stereotype.Component;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface UserDataAnnotation {
String tableName() default "";
String fileName() default "";
}
消费者代码
1.根据kafka约定的topic,接收到消息 message
2.获取 header 里面的信息:Table-Name
3.根据 Table-Name 创建不同的子类处理器对象 userMsgDataHandle
4.根据创建的对象userMsgDataHandle,调用 子类中的实现方法handleMag 处理 message。
5.子类处理器对象中实现 handleMag 方法,处理具体业务逻辑
@Slf4j
@Component
public class UserConsumer {
/**
* 订阅的kafka消息头中的key
* 比如订阅了来自 A 方的 topic: A_COMMON
* topic 中有一对键值对 Table-Name:TEST_TABLE
*/
private static final String USER_MSG_HEADER_KEY = "Table-Name";
@Resource
protected UserDataHandleFactory userDataHandleFactory;
@StreamListener(target = MsgSink.userMsgInput)
public void handleUserMsg(Message<String> message, @Header Map<String, Object> header) {
log.info("kafka-start handleMsg { header: {} }", header);
try {
// 处理消息
String payload = message.getPayload();
// 根据消息 header 里的key Table-Name,获取推送的数据类型 比如推送的数据表示哪个表的数据
String tableName = new String((byte[]) header.get(USER_MSG_HEADER_KEY));
log.info("topic_partitionId_offset:{}_{}_{}, table:{}, message: {}", header.get(KafkaHeaders.RECEIVED_TOPIC),
header.get(KafkaHeaders.RECEIVED_PARTITION_ID), header.get(KafkaHeaders.OFFSET), tableName, payload);
UserMsgDataHandle userMsgDataHandle = null;
try {
userMsgDataHandle = userDataHandleFactory.create(tableName);
userMsgDataHandle.handleMag(payload);
} catch (Exception e) {
// 这里 空处理器异常放行 不处理
if (!StringUtils.contains(e.getMessage(), userDataHandleFactory.NO_HANDLE_ERROR_MESSAGE)) {
throw e;
}
}
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
log.info("acknowledgment: {}", acknowledgment);
if (Objects.nonNull(acknowledgment)) {
// kafka手动确认处理
acknowledgment.acknowledge();
}
} catch (Exception e) {
log.error("USER_SYSTEM-用户中心异常:" + "consumer 本应用 from User-System: mes error ", e);
} finally {
MDC.remove("apiCode");
MDC.remove("apiVer");
MDC.remove("deploymentUnitNo");
MDC.remove("serviceName");
MDC.remove("serviceVer");
MDC.remove("globalSerNo");
MDC.remove("txnSerNo");
MDC.remove("parentTxnSerNo");
}
}
}
理解 @StreamListener(target = MsgSink.userMsgInput) 的使用,结合 Spring Cloud Stream 的核心机制来解析。
这段代码本质上是通过 Spring Cloud Stream 框架实现对 Kafka 消息的消费,而 @StreamListener 是该框架中用于绑定消息监听逻辑的核心注解。
1.背景:Spring Cloud Stream 简介
Spring Cloud Stream 是一个用于构建消息驱动微服务的框架,它通过 抽象绑定器(Binder) 屏蔽了不同消息中间件(如 Kafka、RabbitMQ)的底层差异,让开发者可以用统一的 API 处理消息收发,无需关心具体中间件的细节。
其核心概念包括:
2.
@StreamListener注解的作用
@StreamListener 是 Spring Cloud Stream 中用于标记 消息消费方法 的注解,其核心功能是:
将当前方法注册为指定输入通道的消息处理器,当该通道接收到消息时,自动调用该方法处理消息。
在代码中,@StreamListener(target = MsgSink.userMsgInput) 表示:
当前 handleUserMsg 方法会监听 MsgSink.userMsgInput 这个输入通道,当该通道有消息到达时,方法会被自动触发。
target = MsgSink.userMsgInput的具体含义
MsgSink:通道定义接口MsgSink 是一个 输入通道定义接口,通常由开发者定义,用于声明应用需要监听的输入通道。其内部通过 @Input 注解标记输入通道的名称,例如:
public interface MsgSink {
// 定义输入通道名称为 "userMsgInput"
String userMsgInput = "userMsgInput";
@Input(userMsgInput) // 标记为输入通道
SubscribableChannel userMsgInput();
}
这里的 userMsgInput 是通道的逻辑名称,后续通过配置可将其绑定到具体的 Kafka Topic。
MsgSink.userMsgInput 这个逻辑通道需要通过配置文件(如 application.yml)与实际的 Kafka Topic 绑定,例如:
spring:
cloud:
stream:
kafka:
binder:
brokers: 127.0.0.1:9092 # Kafka 服务地址
bindings:
userMsgInput: # 与 MsgSink.userMsgInput 对应
destination: USER_TOPIC # 绑定到 Kafka 的 USER_TOPIC 主题
group: user-consumer-group # 消费者组
consumer:
auto-offset-reset: earliest # 偏移量重置策略
ack-mode: manual # 手动确认消息(与代码中 acknowledgment.acknowledge() 对应)
通过上述配置,MsgSink.userMsgInput 通道就与 Kafka 的 USER_TOPIC 主题关联起来了。当 USER_TOPIC 有新消息时,消息会被投递到 userMsgInput 通道,进而触发 handleUserMsg 方法。
handleUserMsg 方法的参数和逻辑进一步体现了消息消费的细节:
Message<String> message:Spring Cloud Stream 会自动将消息包装为 Message 对象,其中 message.getPayload() 可获取消息体( payload ),message.getHeaders() 可获取消息头。@Header Map<String, Object> header:用于直接获取消息头信息(如 Kafka 主题、分区、偏移量等元数据)。Table-Name 确定数据类型 → 通过 userDataHandleFactory 获取对应的处理器 → 处理消息 → 手动确认消息(acknowledgment.acknowledge()),确保消息被正确消费后再提交偏移量。@StreamListener(target = MsgSink.userMsgInput) 的核心作用是:
将 handleUserMsg 方法与 MsgSink.userMsgInput 输入通道绑定,当该通道(已通过配置关联到具体 Kafka Topic)接收到消息时,自动触发方法执行消息处理逻辑。
这种方式的优势在于:
主要使用了两种设计模式:
体现点:
UserMsgDataHandle userMsgDataHandle = userDataHandleFactory.create(tableName);
userMsgDataHandle.handleMag(payload);
分析:
UserDataHandleFactory 工厂根据 tableName 动态创建不同的 UserMsgDataHandle 实现类。UserMsgDataHandle 接口定义了统一的消息处理方法 handleMag(),不同表名对应不同的具体策略实现。TEST_TABLE 可能对应不同的处理逻辑)。符合策略模式特征:
体现点:
userDataHandleFactory.create(tableName);
分析:
UserDataHandleFactory 作为工厂,封装了具体策略对象的创建逻辑。tableName 创建对应的 UserMsgDataHandle 实例。异常处理佐证:
if (!StringUtils.contains(e.getMessage(), userDataHandleFactory.NO_HANDLE_ERROR_MESSAGE)) {
throw e;
}
表明工厂在找不到匹配策略时会抛出特定异常(如 "No handler for table: XXX")。
graph LR
Client[UserConsumer] --> Factory[UserDataHandleFactory]
Client --> Strategy[UserMsgDataHandle]
Factory --> StrategyA[ConcreteStrategyA]
Factory --> StrategyB[ConcreteStrategyB]
StrategyA -->|implements| Strategy
StrategyB -->|implements| Strategy

UserConsumer 主流程NO_HANDLE_ERROR_MESSAGE)UserMsgDataHandle总结:
该代码核心采用 策略模式+工厂方法模式 的组合,实现了消息处理的动态路由和扩展性,是典型的解耦设计实践。