疯狂java


您现在的位置: 疯狂软件 >> 新闻资讯 >> 正文

Java中使用AMQ的简单实例


 

 
好了,接上一篇,在对JMS的概念以及AMQ的下载安装有了一定认识之后,本篇就来学习一下AMQ在Java中的Hello World级使用(下文我将着重把重点写在注释里面,请注意代码注释部分!!!)。
 
开发环境
AMQ 5.14.5 服务启动运行 
准备Java Maven项目
 
依赖jar包在apache-activemq-5.14.5根目录下即可找到,或者直接Maven依赖: 
 
org.apache.activemq 
activemq-all 
5.14.5 
 
2. 点对点模型
 
2.1 队列消息发送者
 
编写 QueueProducer.java 如下:
 
package com.jastar.activemq.queue;
 
import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.DeliveryMode; 
import javax.jms.Destination; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnection; 
import org.apache.activemq.ActiveMQConnectionFactory;
 
/** 
* 队列消息-发送(生产)者 
*
 
 
* ClassName: QueueProducer 
*
 
 
 
* Copyright: (c)2017 JASTAR·WANG,All rights reserved. 
*
 
 
 
 
 
 
/** 默认用户名 */
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/** 默认密码 */
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/** 默认连接地址(格式如:tcp://IP:61616) */
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
/** 队列名称 */
public static final String QUEUE_NAME = "hello amq";
 
// 连接工厂(在AMQ中由ActiveMQConnectionFactory实现)
private ConnectionFactory connectionFactory;
 
// 连接对象
private Connection connection;
 
// 会话对象
private Session session;
 
// 消息目的地(对于点对点模型,是Queue对象;对于发布订阅模型,是Topic对象;它们都继承或实现了该接口)
private Destination destination;
 
// 消息发送(生产)者
private MessageProducer messageProducer;
 
public static void main(String[] args) {
    QueueProducer producer = new QueueProducer();
    producer.doSend();
}
 
public void doSend() {
    try {
        /**
         * 1.创建连接工厂<br>
         * 构造函数有多个重载,默认连接本地MQ服务器,也可以手动设置用户名、密码、连接地址信息<br>
         * new ActiveMQConnectionFactory(userName, password, brokerURL)
         */
        connectionFactory = new ActiveMQConnectionFactory();
 
        /**
         * 2.创建连接
         */
        connection = connectionFactory.createConnection();
 
        /**
         * 3.启动连接
         */
        connection.start();
 
        /**
         * 4.创建会话<br>
         * param1:是否支持事务,若为true,则会忽略第二个参数,默认为SESSION_TRANSACTED<br>
         * param2:确认消息模式,若第一个参数为false时,该参数有以下几种状态<br>
         * -Session.AUTO_ACKNOWLEDGE:自动确认。客户端发送和接收消息不需要做额外的工作,即使接收端发生异常,
         * 也会被当作正常发送成功 <br>
         * -Session.CLIENT_ACKNOWLEDGE:客户端确认。客户端接收到消息后,必须调用message.
         * acknowledge() 方法给予收到反馈,JMS服务器才会把该消息当做发送成功,并删除<br>
         * -Session.DUPS_OK_ACKNOWLEDGE:副本确认。一旦接收端应用程序的方法调用从处理消息处返回,
         * 会话对象就会确认消息的接收,而且允许重复确认。
         */
        session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 
        /**
         * 5.创建(发送)消息目的地,即队列,参数为队列名称
         */
        destination = session.createQueue(QUEUE_NAME);
 
        /**
         * 6.创建一个消息生产者,并指定目的地
         */
        messageProducer = session.createProducer(destination);
        /**
         * 其他操作: 设置生产者的生产模式,默认为持久化<br>
         * 参数有以下两种状态:<br>
         * -DeliveryMode.NON_PERSISTENT:消息不持久化,消息被消费之后或者超时之后将从队列中删除
         * -DeliveryMode.PERSISTENT:消息会持久化,即使接收端消费消息之后仍然会保存
         */
        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
        /**
         * 其他操作:设置消息的存活时间(单位:毫秒)
         */
        messageProducer.setTimeToLive(60000);
 
        for (int i = 0; i < 5; i++) {
            /**
             * 7.创建文本消息<br>
             * 此外,还有多种类型的消息如对象,字节……都可以通过session.createXXXMessage()方法创建
             */
            TextMessage message = session.createTextMessage("send content:"
                    + i);
 
            /**
             * 8. 发送
             */
            messageProducer.send(message);
 
        }
        System.out.println("消息发送完成!");
        /**
         * 如果有事务操作也可以提交事务
         */
        session.commit();
        /**
         * 9.关闭生产者对象(即使关闭了程序也在运行)
         */
        messageProducer.close();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        if (connection != null) {
            try {
                /**
 
                 * 10.关闭连接(将会关闭程序)
                 */
                connection.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
 
}
 
2.2 队列消息接收者
 
编写 QueueConsumer.java 其他同上,注意注释部分:
 
package com.jastar.activemq.queue;
 
import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnectionFactory;
 
/** 
* 队列消息-接收(消费)者 
*
 
 
* ClassName: QueueConsumer 
*
 
 
 
* Copyright: (c)2017 JASTAR·WANG,All rights reserved. 
*
 
 
 
 
 
 
private ConnectionFactory connectionFactory;
private Connection connection;
private Session session;
private Destination destination;
// 注意这里是消息接收(消费)者
private MessageConsumer messageConsumer;
 
public static void main(String[] args) {
    QueueConsumer consumer = new QueueConsumer();
    consumer.doReceive();
}
 
public void doReceive() {
    try {
        connectionFactory = new ActiveMQConnectionFactory();
        connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(QueueProducer.QUEUE_NAME);
 
        /**
         * 注意:这里要创建一个消息消费,并指定目的地(即消息源队列)
         */
        messageConsumer = session.createConsumer(destination);
 
        // 方式一:监听接收
        receiveByListener();
 
        // 方式二:阻塞接收
        // receiveByManual();
 
        /**
         * 注意:这里不能再关闭对象了
         */
        // messageConsumer.close();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        /**
         * 注意:这里不能再关闭Connection了
         */
        // connection.close();
    }
 
}
 
/**
 * 通过注册监听器的方式接收消息,属于被动监听
 */
private void receiveByListener() {
    try {
        messageConsumer.setMessageListener(new MessageListener() {
 
            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    try {
                        TextMessage msg = (TextMessage) message;
                        System.out.println("Received:“" + msg.getText()
                                + "”");
                        // 可以通过此方法反馈消息已收到
                        msg.acknowledge();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
 
            }
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
}
 
/**
 * 通过手动去接收消息的方式,属于主动获取
 */
private void receiveByManual() {
    while (true) {
        try {
            /**
             * 通过receive()方法阻塞接收消息,参数为超时时间(单位:毫秒)
             */
            TextMessage message = (TextMessage) messageConsumer
                    .receive(60000);
            if (message != null) {
                System.out.println("Received:“" + message.getText() + "”");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
 
    }
}
 
3. 发布订阅模型
 
3.1 主题消息发送者
 
和点对点模型的发送者唯一不同的是——创建目的地Destination的时候是通过 session.createTopic(); 来创建,其他的使用套路如同 QueueProducer.java ,在此不再贴代码,最后我会留下示例代码地址,需要的可以去down下看看。
 
3.2 主题消息接收者
 
编写 TopicConsumer.java ,如下:
 
package com.jastar.activemq.topic;
 
import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
import javax.jms.TextMessage;
 
import org.apache.activemq.ActiveMQConnectionFactory;
 
/** 
* 主题消息-接收(消费)者 
*
 
 
* ClassName: TopicConsumer 
*
 
 
 
* Copyright: (c)2017 JASTAR·WANG,All rights reserved. 
*
 
 
 
 
 
 
 
 
 
 
 
public static void main(String[] args) {
    /**
     * Pub/Sub模型中,消息可被多个对象接收,不同于P2P模型
     */
    TopicConsumer consumer1 = new TopicConsumer();
    consumer1.doReceive();
    TopicConsumer consumer2 = new TopicConsumer();
    consumer2.doReceive();
}
 
public void doReceive() {
    try {
        connectionFactory = new ActiveMQConnectionFactory();
        connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        destination = session.createTopic(TopicProducer.TOPIC_NAME);
        messageConsumer = session.createConsumer(destination);
 
        // 方式一:监听接收
        receiveByListener();
 
        // 方式二:阻塞接收
        // receiveByManual();
 
    } catch (Exception e) {
        e.printStackTrace();
    }
 
}
 
/**
 * 通过注册监听器的方式接收消息,属于被动监听
 */
private void receiveByListener() {
    try {
        messageConsumer.setMessageListener(new MessageListener() {
 
            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    try {
                        TextMessage msg = (TextMessage) message;
                        System.out.println("Received:“" + msg.getText()
                                + "”");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
 
            }
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
}
 
/**
 * 通过手动去接收消息的方式,属于主动获取
 */
private void receiveByManual() {
    while (true) {
        try {
            /**
             * 通过receive()方法阻塞接收消息,参数为超时时间(单位:毫秒)
             */
            TextMessage message = (TextMessage) messageConsumer
                    .receive(60000);
            if (message != null) {
                System.out.println("Received:“" + message.getText() + "”");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
 
    }
}
 
4.最后说几句
 
以上代码全部经过我的测试,绝对保证可靠运行 
浏览器访问ActiveMQ的Web控制台(http://localhost:8161/admin),即可看到消息、队列、主题等等信息