利用rabbitmq异步实现来提升程序处理性能
2023-03-31 10:46:10 时间
利用rabbitmq提高付款交易处理性能
近期交易系统出款交易量猛增,从skywalking监控平台查看服务的调用链路(Trace),发现在调用外部三方http接口会耗时将近一半。鉴于出款交易在业务上是异步处理的,所以,商定考虑将调用外部接口的部分改为异步实现。
异步实现,一种方案是线程池。弊端是,线程池是在应用节点内部,集群部署环境下,并不利于多节点的均衡处理。再者,单节点故障时,消息就会丢失,这个比较要命,还要考虑补偿。
最好的方案是借助消息中间件,我们使用rabbitmq。
zhenghe-channel应用是springboot项目,异步改为使用rabbitmq来处理,zhenghe-channel既是生产者,又是消费者。
SpringBoot工程如何使用Rabbitmq
我们的springboot项目使用rabbitmq,通常是这样子的。一个是标记了@Configuration注解的RabbitConfig类,通过@Bean注解声明broker(exchange、queue,以及binding)。
package com.emax.channel.provider.config; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Bean; import org.springframework.amqp.core.*; /** * 调用服务商api下发 mq异步实现 * @date 2022-9-20 21:22 */ @Configuration @Slf4j public class LevyPayApiInvokerMqConfig { @Value("${mq-brokers.levy_pay_api_invoker_queue:levy_pay_api_invoker_queue}") private String queueName; @Value("${mq-brokers.levy_pay_api_invoker_exchange:levy_pay_api_invoker_exchange}") private String exchangeName; @Bean public Queue levyPayApiQueue() { return new Queue(queueName, true); } @Bean DirectExchange levyPayApiExchange() { return new DirectExchange(exchangeName, true, false); } @Bean Binding levyPayApiBinding(Queue levyPayApiQueue, DirectExchange levyPayApiExchange) { return BindingBuilder.bind(levyPayApiQueue).to(levyPayApiExchange).withQueueName(); } }
生产者端,不外乎调用RabbitTemplate#convertAndSend方法。
package com.emax.channel.provider.modules.mq; import org.springframework.stereotype.Component; import org.springframework.amqp.core.Binding; import org.springframework.amqp.rabbit.core.RabbitTemplate; /** * 调用服务商api下发 mq异步实现 * @date 2022-9-20 21:22 */ @Component @Slf4j public class LevyPayApiInvokerProducer { /** * @see com.emax.channel.provider.config.LevyPayApiInvokerMqConfig#levyPayApiBinding(Queue, DirectExchange) */ @Autowired private Binding levyPayApiBinding; @Autowired private RabbitTemplate rabbitTemplate; public void gotoPay(LevyPaymentFlow levyPaymentFlow, LevyMerchantRelationDTO levyMerchantRelation) { log.info("httpInvokeLevyApi 调用服务商api下发 mq异步实现 消息入队... Object[] objects = {levyPaymentFlow, levyMerchantRelation, Thread.currentThread().getName()}; rabbitTemplate.convertAndSend(exchangeName, queueName, objects); } }
消费端,则是使用Listener监听队列消息,进行消费。
package com.emax.channel.provider.modules.mq; import org.springframework.stereotype.Component; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; /** * 调用服务商api下发 mq异步实现 * @date 2022-9-20 21:22 */ @Component @Slf4j public class LevyPayApiInvokerMqConsumer { @RabbitHandler @RabbitListener(queues = "${mq-brokers.levy_pay_api_invoker_queue}") public void onMessage(Object[] objects) throws Exception { LevyPaymentFlow levyPaymentFlow = (LevyPaymentFlow) objects[0]; LevyMerchantRelationDTO levyMerchantRelation = (LevyMerchantRelationDTO) objects[1]; Thread.currentThread().setName(String.valueOf(objects[2]).concat("_mq")); log.info("httpInvokeLevyApi 调用服务商api下发 mq异步实现 消息出队... long nowTime = System.currentTimeMillis(); // 执行逻辑 try { // redisLimiter.limitWait("httpInvokeLevyApi", 5, 1); levyPaymentAsyncService.gotoPay(levyPaymentFlow, levyMerchantRelation); } finally { log.info("httpInvokeLevyApi 调用服务商api下发 mq异步实现 duration={}", System.currentTimeMillis() - nowTime); } } }
Rabbitmq代码可读、可维护性挺高
zhenghe-channel项目工程庞大,package包和类文件很多,将broker的声明和使用分放在不同的类里,不易读。 这次呢,为了代码易读和易维护,我将声明broker的Bean和生产者代码、消费者代码写在一个类里。必须棒棒哒~(。≧3≦)ノ⌒☆
package com.emax.channel.provider.modules.mq; import com.emax.channel.provider.modules.levypaymentflow.entity.LevyPaymentFlow; import com.emax.channel.provider.modules.levypaymentflow.provider.LevyPaymentAsyncService; import com.emax.channel.rpcapi.levymerchantrelation.dto.LevyMerchantRelationDTO; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.stereotype.Component; /** * 调用服务商api下发 mq异步实现 * @date 2022-9-20 21:22 */ @Configuration @Component @Slf4j public class LevyPayApiInvokerMqBroker { @Value("${mq-brokers.levy_pay_api_invoker_queue:levy_pay_api_invoker_queue}") private String queueName; @Value("${mq-brokers.levy_pay_api_invoker_exchange:levy_pay_api_invoker_exchange}") private String exchangeName; @Bean public Queue levyPayApiQueue() { return new Queue(queueName, true); } @Bean DirectExchange levyPayApiExchange() { return new DirectExchange(exchangeName, true, false); } @Bean Binding bindingLevyApiExchange(Queue levyPayApiQueue, DirectExchange levyPayApiExchange) { return BindingBuilder.bind(levyPayApiQueue).to(levyPayApiExchange).with(queueName); } @Autowired private LevyPaymentAsyncService levyPaymentAsyncService; @Autowired private RabbitTemplate rabbitTemplate; public void gotoPay(LevyPaymentFlow levyPaymentFlow, LevyMerchantRelationDTO levyMerchantRelation) { log.info("httpInvokeLevyApi 调用服务商api下发 mq异步实现 消息入队...; Object[] objects = {levyPaymentFlow, levyMerchantRelation, Thread.currentThread().getName()}; rabbitTemplate.convertAndSend(exchangeName, queueName, objects); } @RabbitHandler @RabbitListener(queues = "${mq-brokers.levy_pay_api_invoker_queue}") public void onMessage(Object[] objects) throws Exception { LevyPaymentFlow levyPaymentFlow = (LevyPaymentFlow) objects[0]; LevyMerchantRelationDTO levyMerchantRelation = (LevyMerchantRelationDTO) objects[1]; Thread.currentThread().setName(String.valueOf(objects[2]).concat("_mq")); log.info("httpInvokeLevyApi 调用服务商api下发 mq异步实现 消息出队...; long nowTime = System.currentTimeMillis(); // 执行逻辑 try { // redisLimiter.limitWait("httpInvokeLevyApi", 5, 1); levyPaymentAsyncService.gotoPay(levyPaymentFlow, levyMerchantRelation); } finally { log.info("httpInvokeLevyApi 调用服务商api下发 mq异步实现 duration={}", System.currentTimeMillis() - nowTime); } } }
相关文章
- Typescript代码整洁之道
- 2021年Web开发的7大趋势
- GitHub发布2020年度报告:开发者数量超5600万
- 面试官:关于Spring就问这13个
- 电脑狂、理论家、情报员……你是哪种类型的软件工程师?
- Socket粘包问题的3种解决方案,哪一种更优秀!
- 实践: Jenkins Core Api & Job DSL创建项目
- 5分钟带你快速了解ServiceMesh的前世今生
- 学习算法必备:时间复杂度与空间复杂度,你了解多少
- Zookeeper和Eureka有哪些区别?
- Try..Catch 不能捕获的错误有哪些?注意事项又有哪些?
- 搭建Sonarqube 代码质量扫描环境
- 如何实现 ASP.NET Core WebApi 的版本化
- 这样调优:让你的 IDEA 快到飞起来,效率真高!
- 机器编程驾到,会让2700万程序员丢掉饭碗吗?
- 偷师 Next.js:我学到的 6 个设计技巧
- 关于动态规划,你该了解这些!
- 真正影响DevOps/DevSecOps应用的趋势是什么?
- 谁说明天上线,这货压根不知道开发流程!
- 三万字带你彻底吃透MyBatis源码!!