策略接口定义:
public interface MsgSendConsumerStrategy {
/**
* 事件类型
* @return
*/
String type();
/**
* 单条消息处理
* @param message
*/
void handle(GbopMsgVO message);
}
工厂模式,集中创建策略管理器,初始化创建以及对实现策略接口的子类统一管理。
在工厂类中再对外提供一个public方法,该方法中实现调用策略接口的方法。
这样,当不同地方调用时,只需要注入工厂类,就可以调用execute方法,实际内部就根据调用方传入的参数,指向具体的子类来实现。
import com.geely.rc.modules.common.vo.GbopMsgVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.MapUtils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
@Component
@Slf4j
public class MsgSendStrategyContext implements ApplicationContextAware {
/**
* 上下文
*/
private ApplicationContext applicationContext;
private Map<String, MsgSendConsumerStrategy> consumerMap = new HashMap<>();
@PostConstruct
public void init() {
Map<String, MsgSendConsumerStrategy> map = applicationContext.getBeansOfType(MsgSendConsumerStrategy.class);
if (MapUtils.isNotEmpty(map)) {
map.values().forEach(c -> consumerMap.put(c.type(), c));
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* 执行策略
*
* @param type
* @param message
*/
public void execute(String type, GbopMsgVO message) {
log.info("MsgSendStrategyContext type={}", type);
log.info("MsgSendStrategyContext message={}", message);
MsgSendConsumerStrategy consumer = consumerMap.get(type);
if (consumer != null) {
consumer.handle(message);
}
}
}
消息体内容定义:
import lombok.Data;
import java.io.Serializable;
/**
* 触达业务请求对象
*/
@Data
public class GbopMsgVO implements Serializable {
private static final long serialVersionUID = 4304531296257345434L;
/**
* 平台/渠道id
*/
private String channelId;
/**
* 场景码
*/
private String sceneCode;
/**
* 投递对象id 用户账户id
*/
private String receiverId;
/**
* 短信参数模板
*/
private String sceneParams;
}
调用方A:
import com.alibaba.fastjson2.JSONObject;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.geely.rc.modules.common.vo.GbopMsgVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 用户触达服务消息分发
*/
@Component
@Slf4j
public class MsgSendConsumer implements MessageListener {
@Resource
private MsgSendStrategyContext msgSendStrategyContext;
@Override
public Action consume(Message message, ConsumeContext context) {
try {
log.info("MsgSendConsumer message={}", message);
String msgString = new String(message.getBody());
msgSendStrategyContext.execute(message.getTag(), JSONObject.parseObject(msgString, GbopMsgVO.class));
return Action.CommitMessage;
} catch (Exception e) {
log.error("MsgSendConsumer error", e);
return Action.ReconsumeLater;
}
}
}
调用者B,kafka消息监听,上游不同品牌的用户点击发送短信,此时向kafka中推送消息,消息中包含了必要的渠道信息:
import com.alibaba.fastjson2.JSON;
import com.geely.rc.kafka.KafkaGroupConstant;
import com.geely.rc.kafka.KafkaTopicConstant;
import com.geely.rc.modules.common.utils.GbopMessageUtil;
import com.geely.rc.modules.common.vo.GbopMsgVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
/**
* @Description 触达中心消息监听器
* @Author e-sai.wang1
* @Date 2025/3/18 14:16
* @Version 1.0.0
*/
@Slf4j
@Component
public class ReachMsgSendConsumer {
@Resource
GbopMessageUtil gbopMessageUtil;
@KafkaListener(id = KafkaGroupConstant.RC_BASEPROVIDER_REACH_MESSAGE, topics = KafkaTopicConstant.COMMON_REACH_MESSAGE)
public void onMessage(ConsumerRecord<String, String> data) {
try {
log.info("kafka MsgSendConsumer receive msg, topic:[{}], partition:[{}], key:[{}], message:[{}], offset:{}, timestamp:{}",
data.topic(), data.partition(), data.key(), data.value(), data.offset(), new Date(data.timestamp()));
GbopMsgVO msg = JSON.parseObject(data.value(), GbopMsgVO.class);
gbopMessageUtil.handle(msg);
} catch (Exception exception) {
log.error("kafka MsgSendConsumer handler invoke err, topic={}, exception: ", data.topic(), exception);
}
}
}
以上代码实现了一个策略模式的初始化器,用于管理和注册不同的消息发送策略。
在 Spring 容器启动时,自动收集并注册所有实现了 MsgSendConsumerStrategy 接口的 Bean。
@PostConstruct
public void init() {
// 从 Spring 应用上下文中获取所有类型为 MsgSendConsumerStrategy 的 Bean
Map<String, MsgSendConsumerStrategy> map = applicationContext.getBeansOfType(MsgSendConsumerStrategy.class);
// 判断是否找到了至少一个策略实现类
if (MapUtils.isNotEmpty(map)) {
// 遍历所有策略实现,将它们注册到 consumerMap 中
// key: 策略类型(通过 c.type() 方法返回)
// value: 策略实现类的实例
map.values().forEach(c -> consumerMap.put(c.type(), c));
}
}
这是典型的策略模式(Strategy Pattern) + 工厂模式的组合应用:
MsgSendConsumerStrategyMsgSendStrategyContextGbopMessageUtil)Spring 容器启动
↓
创建所有 Bean
↓
@PostConstruct 触发 init()
↓
扫描所有 MsgSendConsumerStrategy 实现类
↓
提取每个实现类的 type() 作为 key
↓
存入 consumerMap 缓存
↓
后续可通过 type 快速获取对应的策略实例
根据项目中的代码,这个机制用于:
多渠道消息发送:
GbopMessageUtil)动态路由:
// 假设调用方代码类似这样:
String type = message.getChannelType(); // 例如:"GBOP", "ZEEKR"
MsgSendConsumerStrategy strategy = consumerMap.get(type);
strategy.handle(message); // 执行对应渠道的发送逻辑
@Component 注解,无需修改现有代码consumerMap.get(type) 可能返回 nullHashMap 不是线程安全的,如果在多线程环境下使用,建议改用 ConcurrentHashMap// ... existing code ...
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
@Slf4j
public class MsgSendStrategyContext implements ApplicationContextAware {
/**
* 上下文
*/
private ApplicationContext applicationContext;
// Deleted:private Map<String, MsgSendConsumerStrategy> consumerMap = new HashMap<>();
private Map<String, MsgSendConsumerStrategy> consumerMap = new ConcurrentHashMap<>();
@PostConstruct
public void init() {
Map<String, MsgSendConsumerStrategy> map = applicationContext.getBeansOfType(MsgSendConsumerStrategy.class);
if (MapUtils.isNotEmpty(map)) {
map.values().forEach(c -> {
String type = c.type();
if (consumerMap.containsKey(type)) {
log.warn("发现重复的策略类型:{}, 已存在策略将被覆盖", type);
}
consumerMap.put(type, c);
log.info("注册消息发送策略:type={}, class={}", type, c.getClass().getSimpleName());
});
} else {
log.warn("未找到任何 MsgSendConsumerStrategy 实现类");
}
}
// ... existing code ...
}
这样的改进可以增加: