博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hadoop15---activemq
阅读量:5818 次
发布时间:2019-06-18

本文共 9054 字,大约阅读时间需要 30 分钟。

java JMS技术JMS是规范,activeMQ是实现。用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。它类似于JDBC,JDBC 是可以用来访问许多不同关系数据库的 API。IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ。JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。

一个主题就是一个队列。

JMS规范体系架构JMS由以下元素组成。JMS提供者provider:连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。JMS客户:生产者消费者。JMS生产者:创建并发送消息的JMS客户。JMS消费者:接收消息的JMS客户。JMS消息:包括可以在JMS客户之间传递的数据的对象JMS队列:一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。JMS主题:一种支持发送消息给多个订阅者的机制。

Java消息服务应用程序结构支持两种模型

1、 点对点或叫做队列模型

在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。

2、发布者/订阅者模型

发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。

一个主题就是一个队列。

这种模式被概括为:

多个消费者可以获得消息

在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

 

代码演示1.下载ActiveMQ去官方网站下载:http://activemq.apache.org/安装。有web管理页面。消息服务是跨语言的。2.运行ActiveMQ解压缩apache-activemq-5.5.1-bin.zip,修改配置文件activeMQ.xml,将0.0.0.0修改为localhost。下面是不同的协议:
然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。启动ActiveMQ以后,启动了一个jetty服务器,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。activeMQ对消息记录是没有做持久化的,先开启消费者再开启生产者。kafka是做了持久化的,可以设置清除时间。activeMQ对于小的消息可以,对于大数据还是要用kafka,kafka有集群。Kafka本质没有完全实现JMS。Kafka消息是不保证顺序的。Kafka官网也说不是一个消息队列,只是一个消息缓存。Kafka是一个缓冲池。

 

常用的JMS实现开源的提供者包括:Apache ActiveMQ、JBoss 社区所研发的 HornetQ、Joram、Coridan的MantaRay、The OpenJMS Group的OpenJMS专有的提供者包括:BEA的BEA WebLogic Server JMS、TIBCO Software的EMS、GigaSpaces Technologies的GigaSpaces、Softwired 2006的iBusIONA Technologies的IONA JMS、SeeBeyond的IQManager(2005年8月被Sun Microsystems并购)、webMethods的JMS+ -my-channels的Nirvana、Sonic Software的SonicMQ、SwiftMQ的SwiftMQ、IBM的WebSphere MQ

生产者:

package cn.itcast_03_mq.topic;import java.util.Random;import javax.jms.JMSException;      public class ProducerTest {               /**         * @param args         */         public static void main(String[] args) throws JMSException, Exception {              ProducerTool producer = new ProducerTool();         Random random = new Random();        for(int i=0;i<2;i++){                        Thread.sleep(random.nextInt(1)*1000);                        producer.produceMessage("Hello, world333!--"+i);                  producer.close();  //刷新管理页面可以看到        }            }      }      package cn.itcast_03_mq.topic;import javax.jms.Connection;      import javax.jms.DeliveryMode;      import javax.jms.Destination;      import javax.jms.JMSException;      import javax.jms.MessageProducer;      import javax.jms.Session;      import javax.jms.TextMessage;           import org.apache.activemq.ActiveMQConnection;      import org.apache.activemq.ActiveMQConnectionFactory;           public class ProducerTool {            private String user = ActiveMQConnection.DEFAULT_USER;             private String password = ActiveMQConnection.DEFAULT_PASSWORD;           private String url = "tcp://192.168.88.128:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;           private String subject = "eee";          private Destination destination = null;          private Connection connection = null;          private Session session = null;          private MessageProducer producer = null;    // 初始化          private void initialize() throws JMSException, Exception {              ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(                      user, password, url);              connection = connectionFactory.createConnection();              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);              destination = session.createTopic(subject);   //主题           producer = session.createProducer(destination);              producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //发送模式         }    // 发送消息          public void produceMessage(String message) throws JMSException, Exception {              initialize();              TextMessage msg = session.createTextMessage(message);              connection.start();              System.out.println("Producer:->Sending message: " + message);              producer.send(msg);              System.out.println("Producer:->Message sent complete!");          }    // 关闭连接          public void close() throws JMSException {              System.out.println("Producer:->Closing connection");              if (producer != null)                  producer.close();              if (session != null)                  session.close();              if (connection != null)                  connection.close();          }      }

消费者:

package cn.itcast_03_mq.topic;import javax.jms.JMSException;public class ConsumerTest implements Runnable {    static Thread t1 = null;    /**     * @param args     * @throws InterruptedException     * @throws InterruptedException     * @throws JMSException     * @throws InterruptedException     */    public static void main(String[] args) throws InterruptedException {        t1 = new Thread(new ConsumerTest());        t1.setDaemon(false);        t1.start();        /**         * 如果发生异常,则重启consumer         */        /*while (true) {            System.out.println(t1.isAlive());            if (!t1.isAlive()) {                t1 = new Thread(new ConsumerTest());                t1.start();                System.out.println("重新启动");            }            Thread.sleep(5000);        }*/        // 延时500毫秒之后停止接受消息        // Thread.sleep(500);        // consumer.close();    }    public void run() {        try {            ConsumerTool consumer = new ConsumerTool();            consumer.consumeMessage();            while (ConsumerTool.isconnection) {                }        } catch (Exception e) {        }    }}package cn.itcast_03_mq.topic;import javax.jms.Connection;      import javax.jms.Destination;      import javax.jms.ExceptionListener;import javax.jms.JMSException;      import javax.jms.MessageConsumer;      import javax.jms.Session;      import javax.jms.MessageListener;      import javax.jms.Message;      import javax.jms.TextMessage;           import org.apache.activemq.ActiveMQConnection;      import org.apache.activemq.ActiveMQConnectionFactory;           public class ConsumerTool implements MessageListener,ExceptionListener {          private String user = ActiveMQConnection.DEFAULT_USER;          private String password = ActiveMQConnection.DEFAULT_PASSWORD;          private String url ="failover://tcp://192.168.88.128:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;          private String subject = "eee";          private Destination destination = null;          private Connection connection = null;          private Session session = null;          private MessageConsumer consumer = null;      public static Boolean isconnection=false;    // 初始化          private void initialize() throws JMSException, Exception {              ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(                      user, password, url);              connection = connectionFactory.createConnection();              session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);              destination = session.createTopic(subject);              consumer = session.createConsumer(destination);         }               // 消费消息          public void consumeMessage() throws JMSException, Exception {              initialize();              connection.start();        consumer.setMessageListener(this);    //监听,调用onMessage方法处理。        connection.setExceptionListener(this);        isconnection=true;        System.out.println("Consumer:->Begin listening...");              // 开始监听          Message message = consumer.receive();           System.out.println("message:::"+message);    }    // 关闭连接          public void close() throws JMSException {              System.out.println("Consumer:->Closing connection");              if (consumer != null)                  consumer.close();              if (session != null)                  session.close();              if (connection != null)                  connection.close();          }    // 消息处理函数          public void onMessage(Message message) {              try {                  if (message instanceof TextMessage) {                      TextMessage txtMsg = (TextMessage) message;                      String msg = txtMsg.getText();                      System.out.println("Consumer:->Received: " + msg);                  } else {                      System.out.println("Consumer:->Received: " + message);                  }              } catch (JMSException e) {                  // TODO Auto-generated catch block                  e.printStackTrace();              }          }    public void onException(JMSException arg0) {        isconnection=false;    }      }

 

转载地址:http://tswdx.baihongyu.com/

你可能感兴趣的文章
DzzOffice共享文件夹设置介绍和示例。
查看>>
工欲善其事必先利其器---Android开发环境搭建
查看>>
java的System.getProperty()方法可以获取的值
查看>>
Spring 项目中把 SQL 语句写在 .sql 文件中
查看>>
我的友情链接
查看>>
线上问题排查思路
查看>>
plexus使用(五)-配置文件
查看>>
Keepalived+LVS-DR模式
查看>>
kvm虚拟化学习笔记(二)之linux kvm虚拟机安装
查看>>
linux目录结构详细介绍
查看>>
Git教程
查看>>
查看RedHat linux版本的三种方法
查看>>
iOS监控网络状态并实时刷新界面数据
查看>>
51博客开通
查看>>
Linux下日志系统的设计
查看>>
可用IP查询
查看>>
Linux系统分析工具之nicstat,dstat(四)
查看>>
“千亿市值”巨无霸的膨胀 腾讯靠什么撬动下一个1000亿美金?
查看>>
UML实例(五):在线购物系统设计类图
查看>>
马哥linux高薪中级-POSTFIX邮件服务(三)
查看>>