博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java之JMS
阅读量:7005 次
发布时间:2019-06-27

本文共 8384 字,大约阅读时间需要 27 分钟。

  一、简介:JMS即(Java Message Service)应用程序接口,是一个中关于面向(MOM)的,用于在两个应用程序之间,或中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

  二、JMS对象模型包含如下几个要素: 

  1)连接工厂。连接工厂(ConnectionFactory)是由管理员创建,并绑定到 树中。客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。
  2)JMS连接。JMS连接(Connection)表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。
  3)JMS会话。JMS会话(Session)表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。
  4)JMS目的。JMS目的(Destination),又称为 ,是实际的消息源。
  5)JMS生产者和消费者。生产者(Message Producer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。
  6)JMS消息通常有两种类型:
    ① 点对点(Point-to-Point)。在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。
    ② 发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。
  三、JMS的五种消息
  StreamMessage -- Java原始值的数据流
  MapMessage--一套名称-值对
  TextMessage--一个字符串对象
  ObjectMessage--一个序列化的 Java对象
  BytesMessage--一个未解释字节的数据流
  四、常用应用类
  1)ConnectionFactory 接口(连接工厂):用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。
  2)Connection 接口(连接):连接代表了应用程序和 之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。
  3)Destination 接口(目标):目标是一个包装了消息目标 的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。
  4)Session 接口(会话):表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持 。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者来接收消息。
  5)MessageConsumer 接口(消息消费者):由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或(非阻塞)接收队列和主题类型的消息。
  6)MessageProducer 接口(消息生产者):由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。
  7)Message 接口(消息):是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:
    a、消息头(必须):包含用于识别和为消息寻找路由的操作设置。
    b、一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
    c、一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。
    d、消息接口非常灵活,并提供了许多方式来定制消息的内容。
  五、这里主要讲解activemq的实现topic和queue的消息传输机制
  1)首先还是需要中间件(不管是单独应用部署,还是嵌入式的服务)
    服务启动可以参考:
  2)activemq工厂连接模式 
     //初始化工厂        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();        //设置链接地址        activeMQConnectionFactory.setBrokerURL("tcp://localhost:61616");        //创建链接        Connection connection = activeMQConnectionFactory.createConnection();        //启动服务        connection.start();
 

  3)我这里简单写了一个activemq的具体实现过程,供参考

  a、目录结构和依赖包

    
org.apache.activemq
activemq-all
5.15.4

  b、核心功能,消息处理中心

package com.pinnet.center;import com.pinnet.consumer.queue.Queue;import com.pinnet.consumer.topic.Topic1;import com.pinnet.consumer.topic.Topic2;import org.apache.activemq.ActiveMQConnectionFactory;import org.apache.activemq.command.ActiveMQObjectMessage;import javax.jms.*;import java.io.Serializable;public class MessageCenter {    private static Session session;    public static void init() throws JMSException {        initConnection();        initRegister();    }    private static void initConnection() throws JMSException {        //初始化工厂        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();        //设置链接地址        activeMQConnectionFactory.setBrokerURL("tcp://localhost:61616");        //创建链接        Connection connection = activeMQConnectionFactory.createConnection();        //创建会话        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        //启动服务        connection.start();    }    /**     * 用于注册对应消息队列     * @throws JMSException     */    public static void initRegister() throws JMSException {        registerQueue(QueueType.QUEUE, new Queue());        registerTopic(TopicType.TOPIC, new Topic1());        registerTopic(TopicType.TOPIC, new Topic2());    }    /**     * 注册topic监听     * @param topicType     * @param messageListener     * @throws JMSException     */    public static void registerTopic(TopicType topicType, MessageListener messageListener) throws JMSException {        //将会话转成topic        TopicSession topicSession = (TopicSession) session;        //创建订阅者        TopicSubscriber topicSubscriber = topicSession.createSubscriber(topicSession.createTopic(topicType.name()));        //设置监听        topicSubscriber.setMessageListener(messageListener);    }    /**     * 注册queue监听     * @param queueType     * @param messageListener     * @throws JMSException     */    public static void registerQueue(QueueType queueType, MessageListener messageListener) throws JMSException {        //将会话转成queue        QueueSession queueSession = (QueueSession) session;        //创建接收者        QueueReceiver queueReceiver = queueSession.createReceiver(queueSession.createQueue(queueType.name()));        //设置监听        queueReceiver.setMessageListener(messageListener);    }    /**     * 发送topic消息     * @param topicType     * @param serializable     * @throws JMSException     */    public static void sendMessageTopic(TopicType topicType, Serializable serializable) throws JMSException {        TopicSession topicSession = (TopicSession) session;        //创建发布者        TopicPublisher topicPublisher = topicSession.createPublisher(topicSession.createTopic(topicType.name()));        //这里数据发布形式采用ObjectMessage        ActiveMQObjectMessage activeMQObjectMessage = new ActiveMQObjectMessage();        activeMQObjectMessage.setObject(serializable);        //发布消息        topicPublisher.publish(activeMQObjectMessage);    }    /**     * 发送queue消息     * @param queueType     * @param serializable     * @throws JMSException     */    public static void sendMessageQueue(QueueType queueType, Serializable serializable) throws JMSException {        QueueSession queueSession = (QueueSession) session;        //创建发送者        QueueSender queueSender = queueSession.createSender(queueSession.createQueue(queueType.name()));        ActiveMQObjectMessage activeMQObjectMessage = new ActiveMQObjectMessage();        activeMQObjectMessage.setObject(serializable);        //发送消息        queueSender.send(activeMQObjectMessage);    }    //使用枚举的目的是更好的管理    public enum TopicType {        TOPIC    }    public enum QueueType {        QUEUE    }}

  c、消息处理抽象类

package com.pinnet.consumer;import javax.jms.JMSException;import javax.jms.Message;import javax.jms.MessageListener;import javax.jms.ObjectMessage;import java.io.Serializable;/** * 抽象类的目的是,让编写者,只管处理消息,不用管理中间过程 */public abstract class MessageConsumer implements MessageListener {    public void onMessage(Message message) {        try {            ObjectMessage objectMessage = (ObjectMessage) message;            handleMessage(objectMessage.getObject());        } catch (JMSException e) {            e.printStackTrace();        }    }    //实现类处理对应消息    public abstract void handleMessage (Serializable serializable);}

  d、2个实现类型

package com.pinnet.consumer.queue;import com.pinnet.consumer.MessageConsumer;import java.io.Serializable;public class Queue extends MessageConsumer {    /**     * 消息处理     * @param serializable     */    public void handleMessage(Serializable serializable) {        System.out.println(serializable+"queue");    }}
package com.pinnet.consumer.topic;import com.pinnet.consumer.MessageConsumer;import java.io.Serializable;public class Topic1 extends MessageConsumer {    /**     * 消息处理     * @param serializable     */    public void handleMessage(Serializable serializable) {        System.out.println(serializable+"topic1");    }}
package com.pinnet.consumer.topic;import com.pinnet.consumer.MessageConsumer;import java.io.Serializable;public class Topic2 extends MessageConsumer {    /**     * 消息处理     * @param serializable     */    public void handleMessage(Serializable serializable) {        System.out.println(serializable+"topic2");    }}

  e、测试:

package com.pinnet;import com.pinnet.center.MessageCenter;import javax.jms.JMSException;public class Main {    public static void main(String[] args) throws JMSException, InterruptedException {        MessageCenter.init();        while (true) {            MessageCenter.sendMessageQueue(MessageCenter.QueueType.QUEUE, "queue");            MessageCenter.sendMessageTopic(MessageCenter.TopicType.TOPIC, "topic");            Thread.sleep(5000);        }    }}

  

  六、我这里是采用纯代码的方式,在spring中jmsTemplate的应用,就像session一样,就可以。

    springmvc的配置方式参考:

    配置方式也可以采用activemq的工厂连接模式一样实现

   七、例子源码:

转载地址:http://plytl.baihongyu.com/

你可能感兴趣的文章
关于Java中异常的总结
查看>>
算法练习
查看>>
容器化应用: Minishift 搭建镜像仓库的可视化管理控制台
查看>>
Canvas 涂鸦
查看>>
webpack模块化原理-Code Splitting
查看>>
如何从两个List中筛选出相同的值
查看>>
.NET Core 2将Visual Basic带到了Linux和macOS平台
查看>>
前端程序员需要掌握的几个专业“词语”
查看>>
PHP最佳实践之数据库
查看>>
【拾遗补缺】java ArrayList的不当使用导致的ConcurrentModificationException问题
查看>>
js 正则表达式
查看>>
使用vue和d3.js实现的dialog,messagebox,confirm,alert弹框
查看>>
Jackson 使用 defaultTyping 实现通用的序列化和反序列化
查看>>
React 更新视图过程
查看>>
第k个排列
查看>>
js数值精度
查看>>
JavaScript 中 apply 、call 的详解
查看>>
设计模式系列·王小二需求历险记(二)
查看>>
百度前端学院学习:动态数据绑定(二)
查看>>
从Python2到Python3:超百万行代码迁移实践
查看>>