Java spike system combat series ~ RabbitMQ dead letter queue processing overtime unpaid orders

Java spike system combat series ~ RabbitMQ dead letter queue processing overtime unpaid orders

Summary:

This blog post is the tenth article of the "Java seckill system actual combat article". In this blog post, we will use RabbitMQ's dead letter queue method to deal with the situation of "users who have successfully generated an order but have not paid after a seckill." See the power of RabbitMQ dead letter queue in the actual business environment!

content:

For the message middleware RabbitMQ, Debug has been briefly introduced in the previous chapter, so I won t repeat it here! In this article, we will use RabbitMQ's dead letter queue to achieve such business requirements: "After the user succeeds in the spike and successfully creates an order record, in theory, it should be performed to pay. Reluctant to pay ~ As for the reason, I don't know!"

For this scenario, all friends can experience it on some mall platforms, that is, after selecting the product, adding it to the shopping cart, click to settle, there will be a countdown at this time, reminding you to complete the payment within the specified time, otherwise the order Will fail!

For this kind of business logic processing, the traditional approach is to use the "timer method" to poll regularly to obtain orders that have exceeded the specified time, and then implement a series of processing measures (such as trying to send SMS messages to users to remind more than Long-term orders are about to expire, etc...) In this spike system, we will use the RabbitMQ dead letter queue component to implement "invalidation" measures for the order!

"Dead letter queue", Gu Ming thought, is a special queue that can delay and delay a certain time before processing messages. Compared with the "ordinary queue", it can realize that "messages entering the dead letter queue are not immediate Processing, but you can wait for a certain period of time before processing" function! The ordinary queue does not work, that is, the message after entering the queue will be immediately monitored and consumed by the corresponding consumer. The following figure shows the basic message model of the ordinary queue:


As for the "dead letter queue", its composition and use is relatively complicated, in normal circumstances, the dead letter queue consists of three core components consisting of : badmail routing switch + + badmail the TTL (survival time message nonessential ~ ), and the dead letter queue can be bound by the "basic switch + basic routing for the producer" , so the producer first sends the message to the message model bound by the "basic switch + basic routing" Medium, that is, indirectly enter the dead letter queue. When the TTL is passed, the message will "hang up" and enter the next transfer station, which is bound by the "dead letter switch + dead letter routing of the consumer underneath" In the established message model. As shown below:

Below, we build the message model of the dead letter queue with actual code, and apply this message model to the above functional modules of the spike system.

(1) 1. you need to create a message model of the dead letter queue in the RabbitmqConfig configuration class. The complete source code is as follows:

//- 

@Bean
public Queue successKillDeadQueue(){
    Map<String, Object> argsMap= Maps.newHashMap();
    argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
    argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
    return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap);
}

//
@Bean
public TopicExchange successKillDeadProdExchange(){
    return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);
}
//+  ->    
@Bean
public Binding successKillDeadProdBinding(){
    return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
}
//
@Bean
public Queue successKillRealQueue(){
    return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
}
//
@Bean
public TopicExchange successKillDeadExchange(){
    return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
}
//+ ->   
@Bean
public Binding successKillDeadBinding(){
    return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
}
 

Among them, the variables read by the environment variable object instance env are configured in the application.properties configuration file, and the values are as follows:

# - 
mq.kill.item.success.kill.dead.queue=${mq.env}.kill.item.success.kill.dead.queue
mq.kill.item.success.kill.dead.exchange=${mq.env}.kill.item.success.kill.dead.exchange
mq.kill.item.success.kill.dead.routing.key=${mq.env}.kill.item.success.kill.dead.routing.key

mq.kill.item.success.kill.dead.real.queue=${mq.env}.kill.item.success.kill.dead.real.queue
mq.kill.item.success.kill.dead.prod.exchange=${mq.env}.kill.item.success.kill.dead.prod.exchange
mq.kill.item.success.kill.dead.prod.routing.key=${mq.env}.kill.item.success.kill.dead.prod.routing.key

# ms
mq.kill.item.success.kill.expire=20000
 


(2) After successfully creating the message model, we need to develop the function of "sending messages into the dead letter queue" in the generic RabbitMQ sending message service class RabbitSenderService. In this function method, we specify the survival time of the message TTL, the value is a configured variable: the value of mq.kill.item.success.kill.expire, which is 20s; its complete source code is as follows:

//- 
public void sendKillSuccessOrderExpireMsg(final String orderCode){
    try {
        if (StringUtils.isNotBlank(orderCode)){
            KillSuccessUserInfo info=itemKillSuccessMapper.selectByCode(orderCode);
            if (info!=null){
                rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"));
                rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
                rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        MessageProperties mp=message.getMessageProperties();
                        mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        mp.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserInfo.class);

                       //TODO TTL( 20s)
                        mp.setExpiration(env.getProperty("mq.kill.item.success.kill.expire"));
                        return message;
                    }
                });
            }
        }
    }catch (Exception e){
        log.error(" - - {}",orderCode,e.fillInStackTrace());
    }
}
 

From the code of "sending a message into the dead letter queue", we can see that the message is first entered into the message model of the dead letter queue bound by the "basic switch + basic routing" ! When the message reaches the TTL, it will naturally come out of the dead letter queue (that is, "freed"), and then enter the next transfer station, namely: "dead letter switch + dead letter routing" bound by the real queue message model In the end, it is truly monitored and consumed by consumers!

At this point, you can run the entire project and system in the external tomcat server, and then open the RabbitMQ back-end console application, find the dead letter queue, you can see the detailed information of the dead letter queue, as shown in the following figure:


(3) Finally, it is necessary to monitor and process the messages in the "real queue" in RabbitMQ's general message monitoring service class RabbitReceiverService: here we are invalidating the order (provided that the payment has not been made!) , Its complete source code is as follows:

//- 
@RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
public void consumeExpireOrder(KillSuccessUserInfo info){
    try {
        log.info(" - - :{}",info);

        if (info!=null){
            ItemKillSuccess entity=itemKillSuccessMapper.selectByPrimaryKey(info.getCode());
            if (entity!=null && entity.getStatus().intValue()==0){
                itemKillSuccessMapper.expireOrder(info.getCode());
            }
        }
    }catch (Exception e){
        log.error(" - - ",e.fillInStackTrace());
    }
}
 

Among them, the operation of invalidating the update order record is implemented by itemKillSuccessMapper.expireOrder(info.getCode());, and the corresponding dynamic Sql is written as follows:

<!-- -->
<update id="expireOrder">
  UPDATE item_kill_success
  SET status = -1
  WHERE code = #{code} AND status = 0
</update>
 


(4) So far, the actual code for the RabbitMQ dead letter queue message model has been completed! Finally, I only need to call it at the place where "the moment the user successfully creates the order in seckill, send a message to the dead letter queue", the calling code is as follows:

/**
 *  - - 
 * @param kill
 * @param userId
 * @throws Exception
 */
private void commonRecordKillSuccessInfo(ItemKill kill, Integer userId) throws Exception{
   //TODO: 

    ItemKillSuccess entity=new ItemKillSuccess();
    String orderNo=String.valueOf(snowFlake.nextId());

   //entity.setCode(RandomUtil.generateOrderCode());  //+N 
    entity.setCode(orderNo);//
    entity.setItemId(kill.getItemId());
    entity.setKillId(kill.getId());
    entity.setUserId(userId.toString());
    entity.setStatus(SysConstant.OrderStatus.SuccessNotPayed.getCode().byteValue());
    entity.setCreateTime(DateTime.now().toDate());
   //TODO:  ->  
    if (itemKillSuccessMapper.countByKillUserId(kill.getId(),userId) <= 0){
        int res=itemKillSuccessMapper.insertSelective(entity);

        if (res>0){
           //TODO: =rabbitmq+mail
            rabbitSenderService.sendKillSuccessEmailMsg(orderNo);

           //TODO:     TTL 
            rabbitSenderService.sendKillSuccessOrderExpireMsg(orderNo);
        }
    }
}
 


Finally, perform a self-test: click the "Buy" button, after the user's spike is successful, a message will be sent to the dead letter queue (this can be seen in the RabbitMQ back-end console, you can see a good message that is ready), wait for 20s, You can see that the message is transferred to the real queue and monitored and consumed by real consumers, as shown below:

Well, the introduction of "RabbitMQ Dead Letter Queue" and the actual application of this article are here for the time being. This method can be very flexible to deal with "timeout unpaid orders", and the whole process is "automatic" , Natural", without the need to manually click the button to trigger it! Of course, everything is not perfect, and so is the dead letter queue. In an article, we will introduce the flaws of this method and adopt corresponding solutions to deal with it!

supplement:

1. At present, the overall construction and actual code combat of the spike system have been completed. The complete source code database address can be downloaded here: gitee.com/steadyjack/... Remember Fork and Star! !

2. Finally, pay attention to the technical WeChat official account of Debug: