文章目录
  1. 1. 一、主要应用方案
  2. 2. 二、JMS消息服务
  3. 3. 三、代码实现(广告词为空时异步添加到MQ,执行添加缓存操作)

一、主要应用方案

异步处理

场景:用户注册时,先将注册信息写入数据库,然后发送短信和邮件
方案:使用消息队列,将发送短信和邮件任务写入队列中,直接返回

应用解耦

场景:用户下单后,订单系统需要通知库存系统减库存。也就意味着订单和库存系统耦合了。
方案:用户下单后,订单系统完成持久化处理,将消息写入队列。库存系统从订阅的消息队列中获取信息进行更新

秒杀

场景:秒杀活动一般会因为流量过大导致流量暴增。
方案:服务器接收秒杀请求,写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到秒杀失败页面。

日志

方案:日志采集客户端将日志放入Kafka队列中

二、JMS消息服务

  1. P2P模式:P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
  • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
  • 接收者在成功接收消息之后需向队列应答成功
  1. Pub/sub模式:多个发布者(Publisher)将消息发送到Topic,系统将这些消息传递给多个订阅者(Subscriber)。
  • 每个消息可以有多个消费者
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。
  • 为了消费消息,订阅者必须保持运行的状态。

三、代码实现(广告词为空时异步添加到MQ,执行添加缓存操作)

  1. 编写message实体类AdWordsSubmitMessage
  2. 添加配置文件

    1
    2
    3
    4
    ### MQ #####
    spring.activemq.broker-url=failover://(tcp://192.168.170.93:61616)?randomize=false&priorityBackup=true
    spring.activemq.in-memory=true
    spring.activemq.pool.enabled=false
  3. 调用MQ接口发送数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class MqManager {
@Autowired
private JmsTemplate jmsTemplate;
//调用MQ接口发送数据
@Async
public void sendDataToMQ(final String queueName, final String dataMessage) {
jmsTemplate.send(queueName,new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage mm = session.createTextMessage();
mm.setText(dataMessage);
return mm;
}
});
}
}
  1. ServiceImpl中注入MQmanager,添加调用

    1
    2
    3
    4
    5
    6
    //异步发送MQ,广告词存缓存
    AdWordsSubmitMessage adm = new AdWordsSubmitMessage();
    adm.setTenantId(tenantId);
    adm.setShopId(shopId);
    adm.setJdSkuId(jdSkuId);
    mqManager.sendDataToMQ(AdWordsMqDict.AD_WORDS_SUBMIT, JSON.toJSONString(adm));
  2. 消息队列监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
public class OrderMessageListener {
//注入接收到消息后进行的订单操作
@Autowired
private AdWordsMessageProcessor adWordsMessageProcessor;
//监听指定消息队列
@JmsListener(destination = AdWordsMqDict.AD_WORDS_SUBMIT, concurrency = "50")
public void onMessage(String message) {
try {
logger.info("接收到消息message={}", message);
adWordsMessageProcessor.processorMessage(message);
} catch (Exception e) {
logger.error("AdWordsMessageListener.message={} error:", message,e);
throw e;
}
}
}
  1. 监听到后操作Processor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Component
public class AdWordsMessageProcessor {
@Autowired
private ItemAdWordsService itemAdWordsService;
@Resource
private CacheClient cacheFeignClient;
public void processorMessage(String message){
try {
AdWordsSubmitMessage orderSubmitMessage = JSON.parseObject(message, AdWordsSubmitMessage.class);
AdWords[] queryAdWords = itemAdWordsService.queryAdWords(orderSubmitMessage.getTenantId(),
orderSubmitMessage.getShopId(), orderSubmitMessage.getJdSkuId());
if (queryAdWords != null && queryAdWords.length > 0) {
String queryAdWordsStr = JSON.toJSONString(queryAdWords);
try {
//写缓存
cacheFeignClient.saveBytes2Cache(orderSubmitMessage.getTenantId(), getRedisKey(orderSubmitMessage.getTenantId(),
orderSubmitMessage.getShopId(), orderSubmitMessage.getJdSkuId()), queryAdWordsStr.getBytes("UTF-8"), 0);
} catch (Exception e) {
log.error("广告词存缓存失败 jdskuid:", orderSubmitMessage.getJdSkuId());
}
}
} catch (Exception e) {
log.error("处理广告词存缓存消息异常,message={}", message, e);
throw e;
}
}
}
}
文章目录
  1. 1. 一、主要应用方案
  2. 2. 二、JMS消息服务
  3. 3. 三、代码实现(广告词为空时异步添加到MQ,执行添加缓存操作)