Skip to content

Commit 38c8272

Browse files
committed
📝 Writing docs.
1 parent 76f56f3 commit 38c8272

2 files changed

Lines changed: 316 additions & 1 deletion

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
- [x] [分布式消息队列](docs/distributed/mq/分布式消息队列.md)
4343
- [x] [Kafka](docs/distributed/mq/kafka.md) - 分布式的、可水平扩展的、基于发布/订阅模式的、支持容错的消息系统。
4444
- [ ] RocketMQ
45-
- [ ] ActiveMQ
45+
- [x] [ActiveMQ](docs/distributed/mq/ActiveMQ.md)
4646
- [ ] RabbitMQ
4747
- 分布式搜索引擎
4848
- [ ] ElasticSearch

docs/distributed/mq/ActiveMQ.md

Lines changed: 315 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
---
2+
title: ActiveMQ
3+
date: 2017/8/18
4+
categories:
5+
- javaweb
6+
tags:
7+
- java
8+
- javaweb
9+
- 分布式
10+
- mq
11+
- jms
12+
---
13+
14+
# ActiveMQ
15+
16+
<!-- TOC depthFrom:2 depthTo:3 -->
17+
18+
- [1. JMS 基本概念](#1-jms-基本概念)
19+
- [1.1. 消息模型](#11-消息模型)
20+
- [1.2. JMS 编程模型](#12-jms-编程模型)
21+
- [2. 安装](#2-安装)
22+
- [3. 项目中的应用](#3-项目中的应用)
23+
- [4. 资源](#4-资源)
24+
25+
<!-- /TOC -->
26+
27+
## 1. JMS 基本概念
28+
29+
`JMS`**Java 消息服务(Java Message Service)API**,是一个 Java 平台中关于面向消息中间件的 API。它用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。
30+
31+
### 1.1. 消息模型
32+
33+
JMS 有两种消息模型:
34+
35+
- Point-to-Point(P2P)
36+
- Publish/Subscribe(Pub/Sub)
37+
38+
#### P2P 的特点
39+
40+
![jms-pointToPoint.gif](http://oyz7npk35.bkt.clouddn.com//image/java/libs/activemq/jms-pointToPoint.gif)
41+
42+
在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列 `javax.jms.Queue` 相关联。
43+
44+
每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。
45+
46+
发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。
47+
48+
接收者在成功接收消息之后需向队列应答成功。
49+
50+
如果你希望发送的每个消息都应该被成功处理的话,那么你需要 P2P 模式。
51+
52+
#### Pub/Sub 的特点
53+
54+
![jms-publishSubscribe.gif](http://oyz7npk35.bkt.clouddn.com//image/java/libs/activemq/jms-publishSubscribe.gif)
55+
56+
发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题 `javax.jms.Topic` 关联。
57+
58+
每个消息可以有多个消费者。
59+
60+
发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
61+
62+
为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
63+
64+
如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用 Pub/Sub 模型。
65+
66+
### 1.2. JMS 编程模型
67+
68+
![jms-publishSubscribe.gif](http://oyz7npk35.bkt.clouddn.com//image/java/libs/activemq/jms-publishSubscribe.gif)
69+
70+
#### ConnectionFactory
71+
72+
创建 `Connection` 对象的工厂,针对两种不同的 jms 消息模型,分别有 `QueueConnectionFactory``TopicConnectionFactory` 两种。可以通过 JNDI 来查找 `ConnectionFactory` 对象。
73+
74+
#### Connection
75+
76+
`Connection` 表示在客户端和 JMS 系统之间建立的链接(对 TCP/IP socket 的包装)。`Connection` 可以产生一个或多个`Session`。跟 `ConnectionFactory` 一样,`Connection` 也有两种类型:`QueueConnection``TopicConnection`
77+
78+
#### Destination
79+
80+
`Destination` 是一个包装了消息目标标识符的被管对象。消息目标是指消息发布和接收的地点,或者是队列 `Queue` ,或者是主题 `Topic` 。JMS 管理员创建这些对象,然后用户通过 JNDI 发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的 `Queue`,以及发布者/订阅者模型的 `Topic`
81+
82+
#### Session
83+
84+
`Session` 表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者来接收消息。同样,`Session` 也分 `QueueSession``TopicSession`
85+
86+
#### MessageConsumer
87+
88+
`MessageConsumer``Session` 创建,并用于将消息发送到 `Destination`。消费者可以同步地(阻塞模式),或(非阻塞)接收 `Queue``Topic` 类型的消息。同样,消息生产者分两种类型:`QueueSender``TopicPublisher`
89+
90+
#### MessageProducer
91+
92+
`MessageProducer``Session` 创建,用于接收被发送到 `Destination` 的消息。`MessageProducer` 有两种类型:`QueueReceiver``TopicSubscriber`。可分别通过 `session``createReceiver(Queue)``createSubscriber(Topic)` 来创建。当然,也可以 `session``creatDurableSubscriber` 方法来创建持久化的订阅者。
93+
94+
#### Message
95+
96+
是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:
97+
98+
- 消息头(必须):包含用于识别和为消息寻找路由的操作设置。
99+
- 一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
100+
- 一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。
101+
102+
消息接口非常灵活,并提供了许多方式来定制消息的内容。
103+
104+
| Common | Point-to-Point | Publish-Subscribe |
105+
| ----------------- | --------------------------- | ---------------------- |
106+
| ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
107+
| Connection | QueueConnection | TopicConnection |
108+
| Destination | Queue | Topic |
109+
| Session | QueueSession | TopicSession |
110+
| MessageProducer | QueueSender | TopicPublisher |
111+
| MessageSender | QueueReceiver, QueueBrowser | TopicSubscriber |
112+
113+
## 2. 安装
114+
115+
**安装条件**
116+
117+
JDK1.7 及以上版本
118+
119+
本地配置了 **JAVA_HOME** 环境变量。
120+
121+
**下载**
122+
123+
支持 Windows/Unix/Linux/Cygwin
124+
125+
[ActiveMQ 官方下载地址](http://activemq.apache.org/download.html)
126+
127+
**Windows 下运行**
128+
129+
1. 解压压缩包到本地;
130+
2. 打开控制台,进入安装目录的 `bin` 目录下;
131+
132+
```
133+
cd [activemq_install_dir]
134+
```
135+
136+
3. 执行 `activemq start` 来启动 ActiveMQ
137+
138+
```
139+
bin\activemq start
140+
```
141+
142+
**测试安装是否成功**
143+
144+
1. ActiveMQ 默认监听端口为 61616
145+
146+
```
147+
netstat -an|find “61616”
148+
```
149+
150+
2. 访问 http://127.0.0.1:8161/admin/
151+
152+
3. 输入用户名、密码
153+
Login: admin
154+
Passwort: admin
155+
156+
4. 点击导航栏的 Queues 菜单
157+
158+
5. 添加一个队列(queue)
159+
160+
## 3. 项目中的应用
161+
162+
**引入依赖**
163+
164+
```xml
165+
<dependency>
166+
<groupId>org.apache.activemq</groupId>
167+
<artifactId>activemq-all</artifactId>
168+
<version>5.14.1</version>
169+
</dependency>
170+
```
171+
172+
**Sender.java**
173+
174+
```java
175+
public class Sender {
176+
private static final int SEND_NUMBER = 4;
177+
178+
public static void main(String[] args) {
179+
// ConnectionFactory :连接工厂,JMS 用它创建连接
180+
ConnectionFactory connectionFactory;
181+
// Connection :JMS 客户端到JMS Provider 的连接
182+
Connection connection = null;
183+
// Session: 一个发送或接收消息的线程
184+
Session session;
185+
// Destination :消息的目的地;消息发送给谁.
186+
Destination destination;
187+
// MessageProducer:消息发送者
188+
MessageProducer producer;
189+
// TextMessage message;
190+
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
191+
connectionFactory = new ActiveMQConnectionFactory(
192+
ActiveMQConnection.DEFAULT_USER,
193+
ActiveMQConnection.DEFAULT_PASSWORD,
194+
"tcp://localhost:61616");
195+
try {
196+
// 构造从工厂得到连接对象
197+
connection = connectionFactory.createConnection();
198+
// 启动
199+
connection.start();
200+
// 获取操作连接
201+
session = connection.createSession(Boolean.TRUE,
202+
Session.AUTO_ACKNOWLEDGE);
203+
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
204+
destination = session.createQueue("FirstQueue");
205+
// 得到消息生成者【发送者】
206+
producer = session.createProducer(destination);
207+
// 设置不持久化,此处学习,实际根据项目决定
208+
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
209+
// 构造消息,此处写死,项目就是参数,或者方法获取
210+
sendMessage(session, producer);
211+
session.commit();
212+
} catch (Exception e) {
213+
e.printStackTrace();
214+
} finally {
215+
try {
216+
if (null != connection)
217+
connection.close();
218+
} catch (Throwable ignore) {
219+
}
220+
}
221+
}
222+
223+
public static void sendMessage(Session session, MessageProducer producer)
224+
throws Exception {
225+
for (int i = 1; i <= SEND_NUMBER; i++) {
226+
TextMessage message = session
227+
.createTextMessage("ActiveMq 发送的消息" + i);
228+
// 发送消息到目的地方
229+
System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
230+
producer.send(message);
231+
}
232+
}
233+
}
234+
```
235+
236+
**Receiver.java**
237+
238+
```java
239+
public class Receiver {
240+
public static void main(String[] args) {
241+
// ConnectionFactory :连接工厂,JMS 用它创建连接
242+
ConnectionFactory connectionFactory;
243+
// Connection :JMS 客户端到JMS Provider 的连接
244+
Connection connection = null;
245+
// Session: 一个发送或接收消息的线程
246+
Session session;
247+
// Destination :消息的目的地;消息发送给谁.
248+
Destination destination;
249+
// 消费者,消息接收者
250+
MessageConsumer consumer;
251+
connectionFactory = new ActiveMQConnectionFactory(
252+
ActiveMQConnection.DEFAULT_USER,
253+
ActiveMQConnection.DEFAULT_PASSWORD,
254+
"tcp://localhost:61616");
255+
try {
256+
// 构造从工厂得到连接对象
257+
connection = connectionFactory.createConnection();
258+
// 启动
259+
connection.start();
260+
// 获取操作连接
261+
session = connection.createSession(Boolean.FALSE,
262+
Session.AUTO_ACKNOWLEDGE);
263+
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
264+
destination = session.createQueue("FirstQueue");
265+
consumer = session.createConsumer(destination);
266+
while (true) {
267+
//设置接收者接收消息的时间,为了便于测试,这里谁定为100s
268+
TextMessage message = (TextMessage) consumer.receive(100000);
269+
if (null != message) {
270+
System.out.println("收到消息" + message.getText());
271+
} else {
272+
break;
273+
}
274+
}
275+
} catch (Exception e) {
276+
e.printStackTrace();
277+
} finally {
278+
try {
279+
if (null != connection)
280+
connection.close();
281+
} catch (Throwable ignore) {
282+
}
283+
}
284+
}
285+
}
286+
```
287+
288+
**运行**
289+
290+
先运行 Receiver.java 进行消息监听,再运行 Send.java 发送消息。
291+
292+
**输出**
293+
294+
Send 的输出内容
295+
296+
```
297+
发送消息:Activemq 发送消息0
298+
发送消息:Activemq 发送消息1
299+
发送消息:Activemq 发送消息2
300+
发送消息:Activemq 发送消息3
301+
```
302+
303+
Receiver 的输出内容
304+
305+
```
306+
收到消息ActiveMQ 发送消息0
307+
收到消息ActiveMQ 发送消息1
308+
收到消息ActiveMQ 发送消息2
309+
收到消息ActiveMQ 发送消息3
310+
```
311+
312+
## 4. 资源
313+
314+
- [ActiveMQ 官网](http://activemq.apache.org/)
315+
- [oracle 官方的 jms 介绍](https://docs.oracle.com/cd/E19575-01/819-3669/6n5sg7cgq/index.html)

0 commit comments

Comments
 (0)