activeMQ公布订阅模式中中经常使用工具类
2023-09-11 14:14:42 时间
package com.jms; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.clapper.util.logging.Logger; import com.pzoom.dsa.common.util.Log; import com.pzoom.dsa.nerd.mysql.DBQueryHelper; public class Jms { static ConnectionFactory connectionFactory; static Connection connection = null; static Session session; static Map<String, MessageProducer> sendQueues = new ConcurrentHashMap<String, MessageProducer>(); static Map<String, MessageConsumer> getQueues = new ConcurrentHashMap<String, MessageConsumer>(); static Log log=Log.getLogger(DBQueryHelper.class); static { connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://10.100.100.100:61616?wireFormat.maxInactivityDuration=0"); try { connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(Boolean.FALSE.booleanValue(), 1); } catch (Exception e) { e.printStackTrace(); } } static MessageProducer getMessageProducer(String name) { if (sendQueues.containsKey(name)) return ((MessageProducer)sendQueues.get(name)); try { Destination destination = session.createQueue(name); MessageProducer producer = session.createProducer(destination); sendQueues.put(name, producer); return producer; } catch (JMSException e) { e.printStackTrace(); } return ((MessageProducer)sendQueues.get(name)); } static MessageConsumer getMessageConsumer(String name) { if (getQueues.containsKey(name)) return ((MessageConsumer)getQueues.get(name)); try { Destination destination = session.createQueue(name); MessageConsumer consumer = session.createConsumer(destination); getQueues.put(name, consumer); return consumer; } catch (JMSException e) { e.printStackTrace(); } return ((MessageConsumer)getQueues.get(name)); } public static void sendMessage(String queue, String text) { try { TextMessage message = session.createTextMessage(text); getMessageProducer(queue).send(message); // log.info("sendMessage " + queue + "\t\t" + text); } catch (JMSException e) { e.printStackTrace(); } } public static String getMessage(String queue) { try { TextMessage message = (TextMessage)getMessageConsumer(queue).receive(10000L); if (message != null) return message.getText(); } catch (JMSException e) { e.printStackTrace(); } return null; } public static void close() { try { session.close(); } catch (JMSException e) { e.printStackTrace(); } try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
相关文章
- 基于dalvik模式下的Xposed Hook开发的某加固脱壳工具
- 谷歌站长工具(Google Search Console)
- Sentinella 自动管理计算机模式的小工具
- linux系统监控工具汇总及几个小脚本 , 系统初始化脚本
- EXCEL,熟悉又不熟悉的项目管理工具
- 网站截图工具EyeWitness
- Linux split文件切分工具的使用
- 函数式编程工具:filter和reduce
- fmt - 简易的文本格式优化工具 simple optimal text formatter
- 短链接转换工具
- QT工具——windepoly(部署)
- 打包工具pyinstaller
- 小技巧 之 一步轻松批量修改文件名后缀(不用额外应用或工具)
- 在智慧城市建设中 计算机模拟是一个强大的工具