混用 简单工厂模式 + 策略模式,对接收的kafka消息进行不同的业务处理
工厂代码
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@Component
public class UserDataHandleFactory implements InitializingBean {
@Resource
private List<UserMsgDataHandle> handleList;
private final Map<String, UserMsgDataHandle> handleMap = Maps.newHashMap();
public static final String NO_HANDLE_ERROR_MESSAGE = "user-system的数据处理者为空";
@Override
public void afterPropertiesSet() {
handleList.forEach(item -> {
UserDataAnnotation userDataAnnotation = AnnotationUtils.findAnnotation(item.getClass(), UserDataAnnotation.class);
if (Objects.isNull(userDataAnnotation) && StringUtils.isNotEmpty(userDataAnnotation.tableName())) {
handleMap.put(userDataAnnotation.tableName(), item);
}
});
}
public UserMsgDataHandle create(String name) {
UserMsgDataHandle handler = handleMap.get(name);
if (handler == null) {
throw new RuntimeException(NO_HANDLE_ERROR_MESSAGE);
} else {
return handler;
}
}
}
工厂中实现创建的处理器-父类
父类中定义 通用的抽象方法 handleMag
public abstract class UserMsgDataHandle<T> {
protected static final Integer BATCH_INSERT_COUNT = 1000;
protected abstract List<T> handleMag(String payload);
}
具体的 处理器实现子类
对父类方法 handleMag 进行实现
@UserDataAnnotation(tableName = "student")
public class StudentMsgDataHandle extends UserMsgDataHandle<StudentEntity>{
@Override
protected List<StudentEntity> handleMag(String payload) {
Log.info("Student 表,kafka消息头:【student】接受消息成功,消息:{}", payload);
return null;
}
}
自定义的注解类:在处理器实现子类中使用,给每个具体子类定义标识。
方便在工厂类中,通过反射获取子类标识,进而根据标识创建具体的子类处理器对象。
import org.springframework.stereotype.Component;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface UserDataAnnotation {
String tableName() default "";
String fileName() default "";
}
消费者代码
1.根据kafka约定的topic,接收到消息 message
2.获取 header 里面的信息:Table-Name
3.根据 Table-Name 创建不同的子类处理器对象 userMsgDataHandle
4.根据创建的对象userMsgDataHandle,调用 子类中的实现方法handleMag 处理 message。
5.子类处理器对象中实现 handleMag 方法,处理具体业务逻辑
@Slf4j
@Component
public class UserConsumer {
/**
* 订阅的kafka消息头中的key
* 比如订阅了来自 A 方的 topic: A_COMMON
* topic 中有一对键值对 Table-Name:TEST_TABLE
*/
private static final String USER_MSG_HEADER_KEY = "Table-Name";
@Resource
protected UserDataHandleFactory userDataHandleFactory;
@StreamListener(target = MsgSink.userMsgInput)
public void handleUserMsg(Message<String> message, @Header Map<String, Object> header) {
log.info("kafka-start handleMsg { header: {} }", header);
try {
// 处理消息
String payload = message.getPayload();
// 根据消息 header 里的key Table-Name,获取推送的数据类型 比如推送的数据表示哪个表的数据
String tableName = new String((byte[]) header.get(USER_MSG_HEADER_KEY));
log.info("topic_partitionId_offset:{}_{}_{}, table:{}, message: {}", header.get(KafkaHeaders.RECEIVED_TOPIC),
header.get(KafkaHeaders.RECEIVED_PARTITION_ID), header.get(KafkaHeaders.OFFSET), tableName, payload);
UserMsgDataHandle userMsgDataHandle = null;
try {
userMsgDataHandle = userDataHandleFactory.create(tableName);
userMsgDataHandle.handleMag(payload);
} catch (Exception e) {
// 这里 空处理器异常放行 不处理
if (!StringUtils.contains(e.getMessage(), userDataHandleFactory.NO_HANDLE_ERROR_MESSAGE)) {
throw e;
}
}
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
log.info("acknowledgment: {}", acknowledgment);
if (Objects.nonNull(acknowledgment)) {
// kafka手动确认处理
acknowledgment.acknowledge();
}
} catch (Exception e) {
log.error("USER_SYSTEM-用户中心异常:" + "consumer 本应用 from User-System: mes error ", e);
} finally {
MDC.remove("apiCode");
MDC.remove("apiVer");
MDC.remove("deploymentUnitNo");
MDC.remove("serviceName");
MDC.remove("serviceVer");
MDC.remove("globalSerNo");
MDC.remove("txnSerNo");
MDC.remove("parentTxnSerNo");
}
}
}
主要使用了两种设计模式:
体现点:
UserMsgDataHandle userMsgDataHandle = userDataHandleFactory.create(tableName);
userMsgDataHandle.handleMag(payload);
分析:
UserDataHandleFactory
工厂根据 tableName
动态创建不同的 UserMsgDataHandle
实现类。UserMsgDataHandle
接口定义了统一的消息处理方法 handleMag()
,不同表名对应不同的具体策略实现。TEST_TABLE
可能对应不同的处理逻辑)。符合策略模式特征:
体现点:
userDataHandleFactory.create(tableName);
分析:
UserDataHandleFactory
作为工厂,封装了具体策略对象的创建逻辑。tableName
创建对应的 UserMsgDataHandle
实例。异常处理佐证:
if (!StringUtils.contains(e.getMessage(), userDataHandleFactory.NO_HANDLE_ERROR_MESSAGE)) {
throw e;
}
表明工厂在找不到匹配策略时会抛出特定异常(如 "No handler for table: XXX")。
UserConsumer
主流程NO_HANDLE_ERROR_MESSAGE
)UserMsgDataHandle
总结:
该代码核心采用 策略模式+工厂方法模式 的组合,实现了消息处理的动态路由和扩展性,是典型的解耦设计实践。