策略模式(Strategy Pattern) + 工厂模式实现不同渠道的短信发送


策略模式(Strategy Pattern) + 工厂模式实现不同渠道的短信发送

策略接口定义:

public interface MsgSendConsumerStrategy {

    /**
     * 事件类型
     * @return
     */
    String type();

    /**
     * 单条消息处理
     * @param message
     */
    void handle(GbopMsgVO message);
}

工厂模式,集中创建策略管理器,初始化创建以及对实现策略接口的子类统一管理。

在工厂类中再对外提供一个public方法,该方法中实现调用策略接口的方法。

这样,当不同地方调用时,只需要注入工厂类,就可以调用execute方法,实际内部就根据调用方传入的参数,指向具体的子类来实现。

import com.geely.rc.modules.common.vo.GbopMsgVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;

@Component
@Slf4j
public class MsgSendStrategyContext implements ApplicationContextAware {

    /**
     * 上下文
     */
    private ApplicationContext applicationContext;

    private Map<String, MsgSendConsumerStrategy> consumerMap = new HashMap<>();

    @PostConstruct
    public void init() {
        Map<String, MsgSendConsumerStrategy> map = applicationContext.getBeansOfType(MsgSendConsumerStrategy.class);
        if (MapUtils.isNotEmpty(map)) {
            map.values().forEach(c -> consumerMap.put(c.type(), c));
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    /**
     * 执行策略
     *
     * @param type
     * @param message
     */
    public void execute(String type,  GbopMsgVO message) {
        log.info("MsgSendStrategyContext type={}", type);
        log.info("MsgSendStrategyContext message={}", message);
        MsgSendConsumerStrategy consumer = consumerMap.get(type);
        if (consumer != null) {
            consumer.handle(message);
        }
    }

}

消息体内容定义:

import lombok.Data;

import java.io.Serializable;

/**
 * 触达业务请求对象
 */
@Data
public class GbopMsgVO implements Serializable {

    private static final long serialVersionUID = 4304531296257345434L;

    /**
     * 平台/渠道id
     */
    private String channelId;

    /**
     * 场景码
     */
    private String sceneCode;

    /**
     * 投递对象id  用户账户id
     */
    private String receiverId;

    /**
     * 短信参数模板
     */
    private String sceneParams;

}

调用方A:

import com.alibaba.fastjson2.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.geely.rc.modules.common.vo.GbopMsgVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * 用户触达服务消息分发
 */
@Component
@Slf4j
public class MsgSendConsumer implements MessageListener {

    @Resource
    private MsgSendStrategyContext msgSendStrategyContext;


    @Override
    public Action consume(Message message, ConsumeContext context) {
        try {
            log.info("MsgSendConsumer message={}", message);
            String msgString = new String(message.getBody());
            msgSendStrategyContext.execute(message.getTag(), JSONObject.parseObject(msgString, GbopMsgVO.class));
            return Action.CommitMessage;
        } catch (Exception e) {
            log.error("MsgSendConsumer error", e);
            return Action.ReconsumeLater;
        }
    }

}

调用者B,kafka消息监听,上游不同品牌的用户点击发送短信,此时向kafka中推送消息,消息中包含了必要的渠道信息:

import com.alibaba.fastjson2.JSON;
import com.geely.rc.kafka.KafkaGroupConstant;
import com.geely.rc.kafka.KafkaTopicConstant;
import com.geely.rc.modules.common.utils.GbopMessageUtil;
import com.geely.rc.modules.common.vo.GbopMsgVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

/**
 * @Description 触达中心消息监听器
 * @Author e-sai.wang1
 * @Date 2025/3/18 14:16
 * @Version 1.0.0
 */
@Slf4j
@Component
public class ReachMsgSendConsumer {

    @Resource
    GbopMessageUtil gbopMessageUtil;

    @KafkaListener(id = KafkaGroupConstant.RC_BASEPROVIDER_REACH_MESSAGE, topics = KafkaTopicConstant.COMMON_REACH_MESSAGE)
    public void onMessage(ConsumerRecord<String, String> data) {
        try {
            log.info("kafka MsgSendConsumer receive msg, topic:[{}], partition:[{}], key:[{}], message:[{}], offset:{}, timestamp:{}",
                    data.topic(), data.partition(), data.key(), data.value(), data.offset(), new Date(data.timestamp()));

            GbopMsgVO msg = JSON.parseObject(data.value(), GbopMsgVO.class);
            gbopMessageUtil.handle(msg);
        } catch (Exception exception) {
            log.error("kafka MsgSendConsumer handler invoke err, topic={}, exception: ", data.topic(), exception);
        }
    }
}

以上代码实现了一个策略模式的初始化器,用于管理和注册不同的消息发送策略。

代码作用分析

核心功能

在 Spring 容器启动时,自动收集并注册所有实现了 MsgSendConsumerStrategy 接口的 Bean。

@PostConstruct
public void init() {
    // 从 Spring 应用上下文中获取所有类型为 MsgSendConsumerStrategy 的 Bean
    Map<String, MsgSendConsumerStrategy> map = applicationContext.getBeansOfType(MsgSendConsumerStrategy.class);
    
    // 判断是否找到了至少一个策略实现类
    if (MapUtils.isNotEmpty(map)) {
        // 遍历所有策略实现,将它们注册到 consumerMap 中
        // key: 策略类型(通过 c.type() 方法返回)
        // value: 策略实现类的实例
        map.values().forEach(c -> consumerMap.put(c.type(), c));
    }
}

设计模式

这是典型的策略模式(Strategy Pattern) + 工厂模式的组合应用:

  1. 策略接口: MsgSendConsumerStrategy
  2. 策略管理器: MsgSendStrategyContext
  3. 具体策略: 各种实现类(如 GbopMessageUtil

工作流程

Spring 容器启动
    ↓
创建所有 Bean
    ↓
@PostConstruct 触发 init()
    ↓
扫描所有 MsgSendConsumerStrategy 实现类
    ↓
提取每个实现类的 type() 作为 key
    ↓
存入 consumerMap 缓存
    ↓
后续可通过 type 快速获取对应的策略实例

实际应用场景

根据项目中的代码,这个机制用于:

  1. 多渠道消息发送

    • GBOP 渠道(GbopMessageUtil
    • 极氪渠道(可能有其他实现类)
    • 未来可以扩展更多渠道(比如:银河渠道、领克渠道等)
  2. 动态路由

    // 假设调用方代码类似这样:
    String type = message.getChannelType(); // 例如:"GBOP", "ZEEKR"
    MsgSendConsumerStrategy strategy = consumerMap.get(type);
    strategy.handle(message); // 执行对应渠道的发送逻辑
    

优势

  1. 解耦:调用方不需要知道具体的策略实现类
  2. 可扩展:新增渠道只需实现接口并添加 @Component 注解,无需修改现有代码
  3. 自动化:Spring 启动时自动完成注册,无需手动配置
  4. 类型安全:使用统一的接口规范

潜在问题

  1. type() 返回值重复:如果两个策略实现返回相同的 type,后注册的会覆盖前者
  2. 空指针风险:如果调用时传入了不存在的 type,consumerMap.get(type) 可能返回 null
  3. 线程安全HashMap 不是线程安全的,如果在多线程环境下使用,建议改用 ConcurrentHashMap

改进思路

// ... existing code ...

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
@Slf4j
public class MsgSendStrategyContext implements ApplicationContextAware {

    /**
     * 上下文
     */
    private ApplicationContext applicationContext;

    // Deleted:private Map<String, MsgSendConsumerStrategy> consumerMap = new HashMap<>();
    private Map<String, MsgSendConsumerStrategy> consumerMap = new ConcurrentHashMap<>();

    @PostConstruct
    public void init() {
        Map<String, MsgSendConsumerStrategy> map = applicationContext.getBeansOfType(MsgSendConsumerStrategy.class);
        if (MapUtils.isNotEmpty(map)) {
            map.values().forEach(c -> {
                String type = c.type();
                if (consumerMap.containsKey(type)) {
                    log.warn("发现重复的策略类型:{}, 已存在策略将被覆盖", type);
                }
                consumerMap.put(type, c);
                log.info("注册消息发送策略:type={}, class={}", type, c.getClass().getSimpleName());
            });
        } else {
            log.warn("未找到任何 MsgSendConsumerStrategy 实现类");
        }
    }

    // ... existing code ...
}

这样的改进可以增加:

  • 线程安全性
  • 重复检测
  • 日志记录,便于排查问题
SpringBoot
JAVA-技能点
设计模式