JMS学习二(简单的ActiveMQ实例)

📅 2026/7/4 18:25:20
JMS学习二(简单的ActiveMQ实例)
下载安装ActiveMQ服务下载地址当然可以去官网下载ActiveMQActiveMQ安装很简单下载解压后到bin目录就有win32 和win64两个目录按照自己的系统进入后就有activemq.bat来启动ActiveMQ服务一、点对点消息模型实例1使用queue作为目的之1、消息发送端package mqtest1; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) { int i 0; //链接工厂 ActiveMQConnectionFactory connectionFactory null; //链接对象 Connection connection null; //会话 Session session null; //队列目的地、生产者发送消息的目的地 Queue queue null; //消息生产者 MessageProducer producer null; connectionFactory new ActiveMQConnectionFactory(admin,admin,tcp://192.168.1.120:61616); try { connection connectionFactory.createConnection(); connection.start(); //第一个参数是否开启事务 true开启 ,false不开启事务如果开启记得手动提交 //参数二表示的是签收模式一般使用的有自动签收和客户端自己确认签收 session connection.createSession(true, Session.AUTO_ACKNOWLEDGE); queue session.createQueue(test_queue); //为队列创建消息生产者 producer session.createProducer(queue); //消息是否为持久性的这个不设置也是可以的默认是持久的 //producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息设置为持久的发送后及时服务关闭了再次开启消息也不会丢失。 //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //发送后如果服务关闭再次开启则消息会丢失。 while (true){ //创建消息 TextMessage message session.createTextMessage(); message.setText(测试队列消息i); //发送消息到目的地 producer.send(message); i; if(i10) { break; } } session.commit(); System.out.println(呵呵消息发送结束); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { //释放资源 //producer.close(); //session.close(); //connection.close(); } } }2、消息消费端package mqtest1; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Receive { public static void main(String[] args) { // 链接工厂 ActiveMQConnectionFactory connectionFactory null; // 链接对象 Connection connection null; // 会话 Session session null; // 队列目的地消费者消费消息的地方 Queue queue null; // 消息消费者 MessageConsumer consumer null; connectionFactory new ActiveMQConnectionFactory(admin, admin, tcp://192.168.1.120:61616); try { connection connectionFactory.createConnection(); connection.start(); // 创建session是的true 和false session connection.createSession(false, Session.AUTO_ACKNOWLEDGE); queue session.createQueue(test_queue); // 队列目的地消费者消费消息的地方 consumer session.createConsumer(queue); // 消息消费者 // Message message consumer.receive(); //同步方式接收 consumer.setMessageListener(new MessageListener() { Override public void onMessage(Message message) { TextMessage textMessage (TextMessage) message; try { String value textMessage.getText(); System.out.println(value: value); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }2点对点模型Destination作为目的地1、消息发送端package mq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class TestMQ { public static void main(String[] args) { int i 0; //链接工厂 ConnectionFactory connectionFactory null; // 链接对象 Connection connection null; // 会话对象 Session session null; // 目的地 Destination destination null; // 消息生产者 MessageProducer producer null; connectionFactory new ActiveMQConnectionFactory(admin,admin,tcp://192.168.1.120:61616); try { connection connectionFactory.createConnection(); connection.start(); //第一个参数是否开启事务 true开启 ,false不开启事务如果开启记得手动提交 //参数二表示的是签收模式一般使用的有自动签收和客户端自己确认签收 session connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination session.createQueue(test-queue); //为目的地创建消息生产者 producer session.createProducer(destination); //消息是否为持久性的这个不设置也是可以的默认是持久的 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); while(true) { TestBean tbean new TestBean(); tbean.setAge(25); tbean.setName(hellojava i); producer.send(session.createObjectMessage(tbean)); i; if( i10) { break; } } System.out.println(呵呵消息已发送); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { try { producer.close(); session.close(); connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }2、消息消费端package mq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class AcceptMq { public static void main(String[] args) { ConnectionFactory connectionFactory; // Connection JMS 客户端到JMS Provider 的连接 Connection connection null; // Session 一个发送或接收消息的线程 Session session null; // Destination 消息的目的地;消息发送给谁. Destination destination null; // 消费者消息接收者 //MessageConsumer consumer null; connectionFactory new ActiveMQConnectionFactory(admin, admin, tcp://192.168.1.120:61616); try { //通过工厂创建链接 connection connectionFactory.createConnection(); //启动链接 connection.start(); //创建会话 session connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //消息目的地 destination session.createQueue(test-queue); //消息消费者 MessageConsumer consumer session.createConsumer(destination); //同步方式接受信息,如果还没有获取到则会阻塞直到接收到信息 /*Message messages consumer.receive(); TestBean value (TestBean)((ObjectMessage)messages).getObject(); String name value.getName();*/ consumer.setMessageListener(new MessageListener(){ Override public void onMessage(Message message){ try { TestBean tbean (TestBean)((ObjectMessage)message).getObject(); System.out.println(tbean: tbean); if(null ! message) { System.out.println(收到信息1 tbean.getName()); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }3、bean 类package mq; import java.io.Serializable; public class TestBean implements Serializable{ private int age; private String name; public TestBean() {}; public TestBean(int age, String name) { this.age age; this.name name; } public int getAge() { return age; } public void setAge(int age) { this.age age; } public String getName() { return name; } public void setName(String name) { this.name name; } }二、发布/订阅消息模型实例1、消息发布端package mq; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class PSMQ { public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory new ActiveMQConnectionFactory(admin,admin,tcp://192.168.1.101:61616); Connection connection factory.createConnection(); connection.start(); Session session connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建话题 Topic topic session.createTopic(myTopic.messages); //为话题创建消息生产者 MessageProducer producer session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); while(true) { TextMessage message session.createTextMessage(); message.setText(message_ System.currentTimeMillis()); producer.send(message); System.out.println(Sent message: message.getText()); } } }2、消息订阅端package mq; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.MessageListener; import org.apache.activemq.ActiveMQConnectionFactory; public class PSAccept { public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory new ActiveMQConnectionFactory(admin,admin,tcp://192.168.1.101:61616); Connection connection factory.createConnection(); connection.start(); Session session connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建话题 Topic topic session.createTopic(myTopic.messages); //为话题创建消费者 MessageConsumer consumer session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { Override public void onMessage(Message message) { TextMessage tm (TextMessage) message; try { System.out.println(Received message: tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }点对点消息模型和发布/订阅消息模型两种方式其实不同的就是使用队列、还是使用话题创建目的地不同其他的都一样。connectionFactory new ActiveMQConnectionFactory(admin,admin,tcp://192.168.1.120:61616);其中第一个admin是用户名第二个是密码而第三个参数就是协议ipport(端口)这几个参数两个客户端都是一样的不然消费端就获取不到了……在消息消费者中我们接收消息有两种方式即同步接收和异步接收同步接受就是使用receive()方法来接受而异步就是设置一个监听对象。说到密码我们顺便来看看ActiveMQ访问密码的设置三、ActiveMQ访问密码设置在ActiveMQ的conf目录的activemq.xml中添加账号密码plugins simpleAuthenticationPlugin users authenticationUser usernamewhd password123 groupsusers,admins/ /users /simpleAuthenticationPlugin /pluginsactivemq.xml中添加位置ok这样我们对这个ActiveMQ设置了一个用户名密码所以在创建链接的时候要修改admin这个默认的用户名密码为修改后的用户名密码。connectionFactory new ActiveMQConnectionFactory(whd, 123,tcp://192.168.0.104:61616);这样我们就能正常的向服务器发送消息而消费端也能从服务商消费消息了……差点忘了还有一个ActiveMQ管理页面地址http://127.0.0.1:8161/admin/ 访问这个地址登陆管理页面默认用户名密码都是admingithub源码地址