混用 简单工厂模式 + 策略模式,对接收的kafka消息进行不同的业务处理


混用 简单工厂模式 + 策略模式,对接收的kafka消息进行不同的业务处理

工厂代码

import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@Component
public class UserDataHandleFactory implements InitializingBean {
    @Resource
    private List<UserMsgDataHandle> handleList;

    private final Map<String, UserMsgDataHandle> handleMap = Maps.newHashMap();

    public static final String NO_HANDLE_ERROR_MESSAGE = "user-system的数据处理者为空";

    @Override
    public void afterPropertiesSet() {
        handleList.forEach(item -> {
            UserDataAnnotation userDataAnnotation = AnnotationUtils.findAnnotation(item.getClass(), UserDataAnnotation.class);
            if (Objects.isNull(userDataAnnotation) && StringUtils.isNotEmpty(userDataAnnotation.tableName())) {
                handleMap.put(userDataAnnotation.tableName(), item);
            }
        });
    }

    public UserMsgDataHandle create(String name) {
        UserMsgDataHandle handler = handleMap.get(name);
        if (handler == null) {
            throw new RuntimeException(NO_HANDLE_ERROR_MESSAGE);
        } else {
            return handler;
        }
    }
}

工厂中实现创建的处理器-父类

父类中定义 通用的抽象方法 handleMag

public abstract class UserMsgDataHandle<T> {
    protected static final Integer BATCH_INSERT_COUNT = 1000;

    protected abstract List<T> handleMag(String payload);
}

具体的 处理器实现子类

对父类方法 handleMag 进行实现

@UserDataAnnotation(tableName = "student")
public class StudentMsgDataHandle extends UserMsgDataHandle<StudentEntity>{

    @Override
    protected List<StudentEntity> handleMag(String payload) {
        Log.info("Student 表,kafka消息头:【student】接受消息成功,消息:{}", payload);
        return null;
    }
}

自定义的注解类:在处理器实现子类中使用,给每个具体子类定义标识。

方便在工厂类中,通过反射获取子类标识,进而根据标识创建具体的子类处理器对象。

import org.springframework.stereotype.Component;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface UserDataAnnotation {
    String tableName() default "";

    String fileName() default "";
}

消费者代码

1.根据kafka约定的topic,接收到消息 message

2.获取 header 里面的信息:Table-Name

3.根据 Table-Name 创建不同的子类处理器对象 userMsgDataHandle

4.根据创建的对象userMsgDataHandle,调用 子类中的实现方法handleMag 处理 message。

5.子类处理器对象中实现 handleMag 方法,处理具体业务逻辑

@Slf4j
@Component
public class UserConsumer {

    /**
     * 订阅的kafka消息头中的key
     * 比如订阅了来自 A 方的 topic: A_COMMON
     * topic 中有一对键值对 Table-Name:TEST_TABLE
     */
    private static final String USER_MSG_HEADER_KEY = "Table-Name";

    @Resource
    protected UserDataHandleFactory userDataHandleFactory;

    @StreamListener(target = MsgSink.userMsgInput)
    public void handleUserMsg(Message<String> message, @Header Map<String, Object> header) {
        log.info("kafka-start handleMsg { header: {} }", header);
        try {
            // 处理消息
            String payload = message.getPayload();
            // 根据消息 header 里的key Table-Name,获取推送的数据类型 比如推送的数据表示哪个表的数据
            String tableName = new String((byte[]) header.get(USER_MSG_HEADER_KEY));
            log.info("topic_partitionId_offset:{}_{}_{}, table:{}, message: {}", header.get(KafkaHeaders.RECEIVED_TOPIC),
                    header.get(KafkaHeaders.RECEIVED_PARTITION_ID), header.get(KafkaHeaders.OFFSET), tableName, payload);
            UserMsgDataHandle userMsgDataHandle = null;
            try {
                userMsgDataHandle = userDataHandleFactory.create(tableName);
                userMsgDataHandle.handleMag(payload);
            } catch (Exception e) {
                // 这里 空处理器异常放行 不处理
                if (!StringUtils.contains(e.getMessage(), userDataHandleFactory.NO_HANDLE_ERROR_MESSAGE)) {
                    throw e;
                }
            }
            Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
            log.info("acknowledgment: {}", acknowledgment);
            if (Objects.nonNull(acknowledgment)) {
                // kafka手动确认处理
                acknowledgment.acknowledge();
            }

        } catch (Exception e) {
            log.error("USER_SYSTEM-用户中心异常:" + "consumer 本应用 from User-System: mes error ", e);
        } finally {
            MDC.remove("apiCode");
            MDC.remove("apiVer");
            MDC.remove("deploymentUnitNo");
            MDC.remove("serviceName");
            MDC.remove("serviceVer");
            MDC.remove("globalSerNo");
            MDC.remove("txnSerNo");
            MDC.remove("parentTxnSerNo");
        }


    }

}

代码分析-文字描述版

主要使用了两种设计模式:

1. 策略模式 (Strategy Pattern)

  • 体现点

    UserMsgDataHandle userMsgDataHandle = userDataHandleFactory.create(tableName);
    userMsgDataHandle.handleMag(payload);
    
  • 分析

    • 通过 UserDataHandleFactory 工厂根据 tableName 动态创建不同的 UserMsgDataHandle 实现类。
    • UserMsgDataHandle 接口定义了统一的消息处理方法 handleMag(),不同表名对应不同的具体策略实现。
    • 运行时根据表名动态切换处理策略(如 TEST_TABLE 可能对应不同的处理逻辑)。
  • 符合策略模式特征

    • 分离变化点(不同表的处理逻辑)与稳定点(消息消费流程)。
    • 通过接口抽象策略行为,支持运行时替换策略。

2. 工厂方法模式 (Factory Method Pattern)

  • 体现点

    userDataHandleFactory.create(tableName);
    
  • 分析

    • UserDataHandleFactory 作为工厂,封装了具体策略对象的创建逻辑。
    • 根据输入参数 tableName 创建对应的 UserMsgDataHandle 实例。
    • 工厂隔离了策略对象的创建细节,客户端只需关注接口调用。
  • 异常处理佐证

    if (!StringUtils.contains(e.getMessage(), userDataHandleFactory.NO_HANDLE_ERROR_MESSAGE)) {
                        throw e;
                    }
    

    表明工厂在找不到匹配策略时会抛出特定异常(如 "No handler for table: XXX")。


设计模式协作关系

  1. 工厂方法模式 负责创建具体策略对象。
  2. 策略模式 负责执行具体算法逻辑。
  3. 两者结合实现 "开闭原则"
    • 新增表处理只需添加新策略类 + 扩展工厂映射
    • 无需修改 UserConsumer 主流程

其他设计要点

  1. 松耦合设计
    • 消费逻辑与具体处理策略解耦
    • 符合 "面向接口编程" 原则
  2. 防御性编程
    • 空策略异常处理(NO_HANDLE_ERROR_MESSAGE
    • Kafka ACK 的空指针检查
  3. 可扩展性
    • 新增表类型只需实现 UserMsgDataHandle
    • 工厂添加新映射关系

总结

该代码核心采用 策略模式+工厂方法模式 的组合,实现了消息处理的动态路由和扩展性,是典型的解耦设计实践。

JAVA-技能点
中间件
Kafka
设计模式