|
| 1 | +--- |
| 2 | +title: ActiveMQ |
| 3 | +date: 2017/8/18 |
| 4 | +categories: |
| 5 | +- javaweb |
| 6 | +tags: |
| 7 | +- java |
| 8 | +- javaweb |
| 9 | +- 分布式 |
| 10 | +- mq |
| 11 | +- jms |
| 12 | +--- |
| 13 | + |
| 14 | +# ActiveMQ |
| 15 | + |
| 16 | +<!-- TOC depthFrom:2 depthTo:3 --> |
| 17 | + |
| 18 | +- [1. JMS 基本概念](#1-jms-基本概念) |
| 19 | + - [1.1. 消息模型](#11-消息模型) |
| 20 | + - [1.2. JMS 编程模型](#12-jms-编程模型) |
| 21 | +- [2. 安装](#2-安装) |
| 22 | +- [3. 项目中的应用](#3-项目中的应用) |
| 23 | +- [4. 资源](#4-资源) |
| 24 | + |
| 25 | +<!-- /TOC --> |
| 26 | + |
| 27 | +## 1. JMS 基本概念 |
| 28 | + |
| 29 | +`JMS` 即 **Java 消息服务(Java Message Service)API**,是一个 Java 平台中关于面向消息中间件的 API。它用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。 |
| 30 | + |
| 31 | +### 1.1. 消息模型 |
| 32 | + |
| 33 | +JMS 有两种消息模型: |
| 34 | + |
| 35 | +- Point-to-Point(P2P) |
| 36 | +- Publish/Subscribe(Pub/Sub) |
| 37 | + |
| 38 | +#### P2P 的特点 |
| 39 | + |
| 40 | + |
| 41 | + |
| 42 | +在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列 `javax.jms.Queue` 相关联。 |
| 43 | + |
| 44 | +每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。 |
| 45 | + |
| 46 | +发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。 |
| 47 | + |
| 48 | +接收者在成功接收消息之后需向队列应答成功。 |
| 49 | + |
| 50 | +如果你希望发送的每个消息都应该被成功处理的话,那么你需要 P2P 模式。 |
| 51 | + |
| 52 | +#### Pub/Sub 的特点 |
| 53 | + |
| 54 | + |
| 55 | + |
| 56 | +发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题 `javax.jms.Topic` 关联。 |
| 57 | + |
| 58 | +每个消息可以有多个消费者。 |
| 59 | + |
| 60 | +发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。 |
| 61 | + |
| 62 | +为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。 |
| 63 | + |
| 64 | +如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用 Pub/Sub 模型。 |
| 65 | + |
| 66 | +### 1.2. JMS 编程模型 |
| 67 | + |
| 68 | + |
| 69 | + |
| 70 | +#### ConnectionFactory |
| 71 | + |
| 72 | +创建 `Connection` 对象的工厂,针对两种不同的 jms 消息模型,分别有 `QueueConnectionFactory` 和`TopicConnectionFactory` 两种。可以通过 JNDI 来查找 `ConnectionFactory` 对象。 |
| 73 | + |
| 74 | +#### Connection |
| 75 | + |
| 76 | +`Connection` 表示在客户端和 JMS 系统之间建立的链接(对 TCP/IP socket 的包装)。`Connection` 可以产生一个或多个`Session`。跟 `ConnectionFactory` 一样,`Connection` 也有两种类型:`QueueConnection` 和 `TopicConnection`。 |
| 77 | + |
| 78 | +#### Destination |
| 79 | + |
| 80 | +`Destination` 是一个包装了消息目标标识符的被管对象。消息目标是指消息发布和接收的地点,或者是队列 `Queue` ,或者是主题 `Topic` 。JMS 管理员创建这些对象,然后用户通过 JNDI 发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的 `Queue`,以及发布者/订阅者模型的 `Topic`。 |
| 81 | + |
| 82 | +#### Session |
| 83 | + |
| 84 | +`Session` 表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者来接收消息。同样,`Session` 也分 `QueueSession` 和 `TopicSession`。 |
| 85 | + |
| 86 | +#### MessageConsumer |
| 87 | + |
| 88 | +`MessageConsumer` 由 `Session` 创建,并用于将消息发送到 `Destination`。消费者可以同步地(阻塞模式),或(非阻塞)接收 `Queue` 和 `Topic` 类型的消息。同样,消息生产者分两种类型:`QueueSender` 和`TopicPublisher`。 |
| 89 | + |
| 90 | +#### MessageProducer |
| 91 | + |
| 92 | +`MessageProducer` 由 `Session` 创建,用于接收被发送到 `Destination` 的消息。`MessageProducer` 有两种类型:`QueueReceiver` 和 `TopicSubscriber`。可分别通过 `session` 的 `createReceiver(Queue)` 或 `createSubscriber(Topic)` 来创建。当然,也可以 `session` 的 `creatDurableSubscriber` 方法来创建持久化的订阅者。 |
| 93 | + |
| 94 | +#### Message |
| 95 | + |
| 96 | +是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分: |
| 97 | + |
| 98 | +- 消息头(必须):包含用于识别和为消息寻找路由的操作设置。 |
| 99 | +- 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。 |
| 100 | +- 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。 |
| 101 | + |
| 102 | +消息接口非常灵活,并提供了许多方式来定制消息的内容。 |
| 103 | + |
| 104 | +| Common | Point-to-Point | Publish-Subscribe | |
| 105 | +| ----------------- | --------------------------- | ---------------------- | |
| 106 | +| ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory | |
| 107 | +| Connection | QueueConnection | TopicConnection | |
| 108 | +| Destination | Queue | Topic | |
| 109 | +| Session | QueueSession | TopicSession | |
| 110 | +| MessageProducer | QueueSender | TopicPublisher | |
| 111 | +| MessageSender | QueueReceiver, QueueBrowser | TopicSubscriber | |
| 112 | + |
| 113 | +## 2. 安装 |
| 114 | + |
| 115 | +**安装条件** |
| 116 | + |
| 117 | +JDK1.7 及以上版本 |
| 118 | + |
| 119 | +本地配置了 **JAVA_HOME** 环境变量。 |
| 120 | + |
| 121 | +**下载** |
| 122 | + |
| 123 | +支持 Windows/Unix/Linux/Cygwin |
| 124 | + |
| 125 | +[ActiveMQ 官方下载地址](http://activemq.apache.org/download.html) |
| 126 | + |
| 127 | +**Windows 下运行** |
| 128 | + |
| 129 | +1. 解压压缩包到本地; |
| 130 | +2. 打开控制台,进入安装目录的 `bin` 目录下; |
| 131 | + |
| 132 | +``` |
| 133 | +cd [activemq_install_dir] |
| 134 | +``` |
| 135 | + |
| 136 | +3. 执行 `activemq start` 来启动 ActiveMQ |
| 137 | + |
| 138 | +``` |
| 139 | +bin\activemq start |
| 140 | +``` |
| 141 | + |
| 142 | +**测试安装是否成功** |
| 143 | + |
| 144 | +1. ActiveMQ 默认监听端口为 61616 |
| 145 | + |
| 146 | +``` |
| 147 | +netstat -an|find “61616” |
| 148 | +``` |
| 149 | + |
| 150 | +2. 访问 http://127.0.0.1:8161/admin/ |
| 151 | + |
| 152 | +3. 输入用户名、密码 |
| 153 | + Login: admin |
| 154 | + Passwort: admin |
| 155 | + |
| 156 | +4. 点击导航栏的 Queues 菜单 |
| 157 | + |
| 158 | +5. 添加一个队列(queue) |
| 159 | + |
| 160 | +## 3. 项目中的应用 |
| 161 | + |
| 162 | +**引入依赖** |
| 163 | + |
| 164 | +```xml |
| 165 | +<dependency> |
| 166 | + <groupId>org.apache.activemq</groupId> |
| 167 | + <artifactId>activemq-all</artifactId> |
| 168 | + <version>5.14.1</version> |
| 169 | +</dependency> |
| 170 | +``` |
| 171 | + |
| 172 | +**Sender.java** |
| 173 | + |
| 174 | +```java |
| 175 | +public class Sender { |
| 176 | + private static final int SEND_NUMBER = 4; |
| 177 | + |
| 178 | + public static void main(String[] args) { |
| 179 | + // ConnectionFactory :连接工厂,JMS 用它创建连接 |
| 180 | + ConnectionFactory connectionFactory; |
| 181 | + // Connection :JMS 客户端到JMS Provider 的连接 |
| 182 | + Connection connection = null; |
| 183 | + // Session: 一个发送或接收消息的线程 |
| 184 | + Session session; |
| 185 | + // Destination :消息的目的地;消息发送给谁. |
| 186 | + Destination destination; |
| 187 | + // MessageProducer:消息发送者 |
| 188 | + MessageProducer producer; |
| 189 | + // TextMessage message; |
| 190 | + // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar |
| 191 | + connectionFactory = new ActiveMQConnectionFactory( |
| 192 | + ActiveMQConnection.DEFAULT_USER, |
| 193 | + ActiveMQConnection.DEFAULT_PASSWORD, |
| 194 | + "tcp://localhost:61616"); |
| 195 | + try { |
| 196 | + // 构造从工厂得到连接对象 |
| 197 | + connection = connectionFactory.createConnection(); |
| 198 | + // 启动 |
| 199 | + connection.start(); |
| 200 | + // 获取操作连接 |
| 201 | + session = connection.createSession(Boolean.TRUE, |
| 202 | + Session.AUTO_ACKNOWLEDGE); |
| 203 | + // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 |
| 204 | + destination = session.createQueue("FirstQueue"); |
| 205 | + // 得到消息生成者【发送者】 |
| 206 | + producer = session.createProducer(destination); |
| 207 | + // 设置不持久化,此处学习,实际根据项目决定 |
| 208 | + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); |
| 209 | + // 构造消息,此处写死,项目就是参数,或者方法获取 |
| 210 | + sendMessage(session, producer); |
| 211 | + session.commit(); |
| 212 | + } catch (Exception e) { |
| 213 | + e.printStackTrace(); |
| 214 | + } finally { |
| 215 | + try { |
| 216 | + if (null != connection) |
| 217 | + connection.close(); |
| 218 | + } catch (Throwable ignore) { |
| 219 | + } |
| 220 | + } |
| 221 | + } |
| 222 | + |
| 223 | + public static void sendMessage(Session session, MessageProducer producer) |
| 224 | + throws Exception { |
| 225 | + for (int i = 1; i <= SEND_NUMBER; i++) { |
| 226 | + TextMessage message = session |
| 227 | + .createTextMessage("ActiveMq 发送的消息" + i); |
| 228 | + // 发送消息到目的地方 |
| 229 | + System.out.println("发送消息:" + "ActiveMq 发送的消息" + i); |
| 230 | + producer.send(message); |
| 231 | + } |
| 232 | + } |
| 233 | +} |
| 234 | +``` |
| 235 | + |
| 236 | +**Receiver.java** |
| 237 | + |
| 238 | +```java |
| 239 | +public class Receiver { |
| 240 | + public static void main(String[] args) { |
| 241 | + // ConnectionFactory :连接工厂,JMS 用它创建连接 |
| 242 | + ConnectionFactory connectionFactory; |
| 243 | + // Connection :JMS 客户端到JMS Provider 的连接 |
| 244 | + Connection connection = null; |
| 245 | + // Session: 一个发送或接收消息的线程 |
| 246 | + Session session; |
| 247 | + // Destination :消息的目的地;消息发送给谁. |
| 248 | + Destination destination; |
| 249 | + // 消费者,消息接收者 |
| 250 | + MessageConsumer consumer; |
| 251 | + connectionFactory = new ActiveMQConnectionFactory( |
| 252 | + ActiveMQConnection.DEFAULT_USER, |
| 253 | + ActiveMQConnection.DEFAULT_PASSWORD, |
| 254 | + "tcp://localhost:61616"); |
| 255 | + try { |
| 256 | + // 构造从工厂得到连接对象 |
| 257 | + connection = connectionFactory.createConnection(); |
| 258 | + // 启动 |
| 259 | + connection.start(); |
| 260 | + // 获取操作连接 |
| 261 | + session = connection.createSession(Boolean.FALSE, |
| 262 | + Session.AUTO_ACKNOWLEDGE); |
| 263 | + // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 |
| 264 | + destination = session.createQueue("FirstQueue"); |
| 265 | + consumer = session.createConsumer(destination); |
| 266 | + while (true) { |
| 267 | + //设置接收者接收消息的时间,为了便于测试,这里谁定为100s |
| 268 | + TextMessage message = (TextMessage) consumer.receive(100000); |
| 269 | + if (null != message) { |
| 270 | + System.out.println("收到消息" + message.getText()); |
| 271 | + } else { |
| 272 | + break; |
| 273 | + } |
| 274 | + } |
| 275 | + } catch (Exception e) { |
| 276 | + e.printStackTrace(); |
| 277 | + } finally { |
| 278 | + try { |
| 279 | + if (null != connection) |
| 280 | + connection.close(); |
| 281 | + } catch (Throwable ignore) { |
| 282 | + } |
| 283 | + } |
| 284 | + } |
| 285 | +} |
| 286 | +``` |
| 287 | + |
| 288 | +**运行** |
| 289 | + |
| 290 | +先运行 Receiver.java 进行消息监听,再运行 Send.java 发送消息。 |
| 291 | + |
| 292 | +**输出** |
| 293 | + |
| 294 | +Send 的输出内容 |
| 295 | + |
| 296 | +``` |
| 297 | +发送消息:Activemq 发送消息0 |
| 298 | +发送消息:Activemq 发送消息1 |
| 299 | +发送消息:Activemq 发送消息2 |
| 300 | +发送消息:Activemq 发送消息3 |
| 301 | +``` |
| 302 | + |
| 303 | +Receiver 的输出内容 |
| 304 | + |
| 305 | +``` |
| 306 | +收到消息ActiveMQ 发送消息0 |
| 307 | +收到消息ActiveMQ 发送消息1 |
| 308 | +收到消息ActiveMQ 发送消息2 |
| 309 | +收到消息ActiveMQ 发送消息3 |
| 310 | +``` |
| 311 | + |
| 312 | +## 4. 资源 |
| 313 | + |
| 314 | +- [ActiveMQ 官网](http://activemq.apache.org/) |
| 315 | +- [oracle 官方的 jms 介绍](https://docs.oracle.com/cd/E19575-01/819-3669/6n5sg7cgq/index.html) |
0 commit comments