zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

Spring boot+redis实现消息发布与订阅的代码

RedisSpringBoot消息代码 实现 发布 订阅
2023-06-13 09:19:50 时间
groupId org.springframework.boot /groupId artifactId spring-boot-starter-data-redis /artifactId /dependency dependency groupId org.springframework.boot /groupId artifactId spring-boot-starter-web /artifactId /dependency dependency groupId com.alibaba /groupId artifactId fastjson /artifactId version 1.2.41 /version /dependency

二.编辑yml配置文件


server:

 port: 7888

# 日志配置

logging:

 config: classpath:log/logback.xml

 level:

 cn.com.dhcc: info

 org.springframework: info

 org.springframework.web: info

 com.alibaba.nacos.client.naming: error

spring:

 redis:

 host: localhost

 port: 6379

 password: *********

 database: 1

 jedis:

 pool:

 max-idle: 8

 max-active: 8

 max-wait: -1

 min-idle: 0

 timeout: 5000

三.配置Redis


@Configuration

public class RedisConfiguration {

 * 实例化 RedisTemplate 对象

 * @return

 @Bean("RedisTemplateS")

 public RedisTemplate String, Object functionDomainRedisTemplate(RedisConnectionFactory redisConnectionFactory) {

 RedisTemplate String, Object redisTemplate = new RedisTemplate ();

 initDomainRedisTemplate(redisTemplate, redisConnectionFactory);

 return redisTemplate;

 * 设置数据存入 redis 的序列化方式,并开启事务

 * @param redisTemplate

 * @param factory

 private void initDomainRedisTemplate(@Qualifier("RedisTemplateS") RedisTemplate String, Object redisTemplate, RedisConnectionFactory factory) {

 // 如果不配置Serializer,那么存储的时候缺省使用String,如果用User类型存储,那么会提示错误User can"t cast to

 // String!

 redisTemplate.setKeySerializer(new StringRedisSerializer());

 redisTemplate.setHashKeySerializer(new StringRedisSerializer());

 FastJsonRedisSerializer Object fastJsonRedisSerializer = new FastJsonRedisSerializer Object (Object.class);

 redisTemplate.setHashValueSerializer(fastJsonRedisSerializer);

 redisTemplate.setValueSerializer(fastJsonRedisSerializer);

 //redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());

 //redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());

 // 开启事务

 redisTemplate.setEnableTransactionSupport(true);

 redisTemplate.setConnectionFactory(factory);

 * 注入封装RedisTemplate @Title: redisUtil @return RedisUtil @date

 @Bean(name = "redisUtils")

 public RedisUtils redisUtil(@Qualifier("RedisTemplateS") RedisTemplate String, Object redisTemplate) {

 RedisUtils redisUtil = new RedisUtils();

 redisUtil.setRedisTemplate(redisTemplate);

 return redisUtil;

 }

四.编写RedisUtil消息发布方法


public class RedisUtils {

 private static final Logger log = LoggerFactory.getLogger(RedisUtils.class);

 private RedisTemplate String, Object redisTemplate;

 public void setRedisTemplate(RedisTemplate String, Object redisTemplate) {

 this.redisTemplate = redisTemplate;

 public void publish(String channal ,Object obj) {

 redisTemplate.convertAndSend(channal,obj );

}

五.配置消息监听


@Configuration

public class RedisMessageListener {

 * 创建连接工厂

 * @param connectionFactory

 * @param listenerAdapter

 * @return

 @Bean

 public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,

 MessageListenerAdapter listenerAdapter,MessageListenerAdapter listenerAdapter2){

 RedisMessageListenerContainer container = new RedisMessageListenerContainer();

 container.setConnectionFactory(connectionFactory);

 //接受消息的key

 container.addMessageListener(listenerAdapter,new PatternTopic("phone"));

 return container;

 * 绑定消息监听者和接收监听的方法

 * @param receiver

 * @return

 @Bean

 public MessageListenerAdapter listenerAdapter(ReceiverRedisMessage receiver){

 return new MessageListenerAdapter(receiver,"receiveMessage");

 * 注册订阅者

 * @param latch

 * @return

 @Bean

 ReceiverRedisMessage receiver(CountDownLatch latch) {

 return new ReceiverRedisMessage(latch);

 * 计数器,用来控制线程

 * @return

 @Bean

 public CountDownLatch latch(){

 return new CountDownLatch(1);//指定了计数的次数 1

}

六.消息订阅方法


public class ReceiverRedisMessage {

 private static final Logger log = LoggerFactory.getLogger(ReceiverRedisMessage.class);

 private CountDownLatch latch;

 @Autowired

 public ReceiverRedisMessage(CountDownLatch latch) {

 this.latch = latch;

 * 队列消息接收方法

 * @param jsonMsg

 public void receiveMessage(String jsonMsg) {

 log.info("[开始消费REDIS消息队列phone数据...]");

 try {

 log.info("监听者收到消息:{}", jsonMsg);

 JSONObject exJson = JSONObject.parseObject(jsonMsg);

 User user = JSON.toJavaObject(exJson, User.class);

 System.out.println("转化为对象 :"+user);

 log.info("[消费REDIS消息队列phone数据成功.]");

 } catch (Exception e) {

 log.error("[消费REDIS消息队列phone数据失败,失败信息:{}]", e.getMessage());

 latch.countDown();

}

七.定时消息发布测试


@EnableScheduling

@Component

public class PublisherController {

 private static final Logger log = LoggerFactory.getLogger(PublisherController.class);

 @Autowired

 private RedisUtils redisUtils;

 @Scheduled(fixedRate = 5000)

 public String pubMsg() {

 User user=new User(1, "尚***", 26,"男","陕西省xxxx市xxxxxx县");

 redisUtils.publish("phone", user);

 log.info("Publisher sendes Topic... ");

 return "success";

}

八.测试结果

九.发布对象User实体


public class User implements Serializable {

 private static final long serialVersionUID = 1L;

 private int id;

 private String name;

 private int age;

 private String sex;

 private String address;

 .....................

}

到此这篇关于Spring boot+redis实现消息发布与订阅的文章就介绍到这了,更多相关Spring boot redis消息发布与订阅内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!


我想要获取技术服务或软件
服务范围:MySQL、ORACLE、SQLSERVER、MongoDB、PostgreSQL 、程序问题
服务方式:远程服务、电话支持、现场服务,沟通指定方式服务
技术标签:数据恢复、安装配置、数据迁移、集群容灾、异常处理、其它问题

本站部分文章参考或来源于网络,如有侵权请联系站长。
数据库远程运维 Spring boot+redis实现消息发布与订阅的代码