Skip to content

Commit 5bd9b3b

Browse files
committed
✨ 『并发包入坑指北』之阻塞队列
1 parent b13a7c3 commit 5bd9b3b

3 files changed

Lines changed: 285 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
- [深入理解线程通信](https://github.com/crossoverJie/JCSprout/blob/master/MD/concurrent/thread-communication.md)
4545
- [一个线程罢工的诡异事件](docs/thread/thread-gone.md)
4646
- [线程池中你不容错过的一些细节](docs/thread/thread-gone2.md)
47+
- [『并发包入坑指北』之阻塞队列](docs/thread/ArrayBlockingQueue.md)
4748

4849
### JVM
4950
- [Java 运行时内存划分](https://github.com/crossoverJie/JCSprout/blob/master/MD/MemoryAllocation.md)

docs/_sidebar.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
- [深入理解线程通信](thread/thread-communication.md)
1919
- [一个线程罢工的诡异事件](thread/thread-gone.md)
2020
- [线程池中你不容错过的一些细节](thread/thread-gone2.md)
21+
- [『并发包入坑指北』之阻塞队列](thread/ArrayBlockingQueue.md)
2122

2223
- JVM
2324

docs/thread/ArrayBlockingQueue.md

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
![](https://ws4.sinaimg.cn/large/006tNc79ly1g1vn9xpgp4j31ak0u013m.jpg)
2+
3+
4+
5+
# 前言
6+
7+
较长一段时间以来我都发现不少开发者对 jdk 中的 `J.U.C`(java.util.concurrent)也就是 Java 并发包的使用甚少,更别谈对它的理解了;但这却也是我们进阶的必备关卡。
8+
9+
之前或多或少也分享过相关内容,但都不成体系;于是便想整理一套与并发包相关的系列文章。
10+
11+
其中的内容主要包含以下几个部分:
12+
13+
- 根据定义自己实现一个并发工具。
14+
- JDK 的标准实现。
15+
- 实践案例。
16+
17+
18+
基于这三点我相信大家对这部分内容不至于一问三不知。
19+
20+
既然开了一个新坑,就不想做的太差;所以我打算将这个列表下的大部分类都讲到。
21+
22+
![](https://ws2.sinaimg.cn/large/006tNc79ly1g1vpwdqbkrj30ab09nmy9.jpg)
23+
24+
25+
所以本次重点讨论 `ArrayBlockingQueue`
26+
27+
# 自己实现
28+
29+
在自己实现之前先搞清楚阻塞队列的几个特点:
30+
31+
- 基本队列特性:先进先出。
32+
- 写入队列空间不可用时会阻塞。
33+
- 获取队列数据时当队列为空时将阻塞。
34+
35+
36+
实现队列的方式多种,总的来说就是数组和链表;其实我们只需要搞清楚其中一个即可,不同的特性主要表现为数组和链表的区别。
37+
38+
这里的 `ArrayBlockingQueue` 看名字很明显是由数组实现。
39+
40+
我们先根据它这三个特性尝试自己实现试试。
41+
42+
## 初始化队列
43+
44+
我这里自定义了一个类:`ArrayQueue`,它的构造函数如下:
45+
46+
```java
47+
public ArrayQueue(int size) {
48+
items = new Object[size];
49+
}
50+
```
51+
52+
很明显这里的 `items` 就是存放数据的数组;在初始化时需要根据大小创建数组。
53+
54+
![](https://ws1.sinaimg.cn/large/006tNc79ly1g1wd71n229j30wb0u043w.jpg)
55+
56+
## 写入队列
57+
58+
写入队列比较简单,只需要依次把数据存放到这个数组中即可,如下图:
59+
60+
![](https://ws3.sinaimg.cn/large/006tNc79ly1g1we7yeykej30b0060mxc.jpg)
61+
62+
但还是有几个需要注意的点:
63+
64+
- 队列满的时候,写入的线程需要被阻塞。
65+
- 写入过队列的数量大于队列大小时需要从第一个下标开始写。
66+
67+
先看第一个`队列满的时候,写入的线程需要被阻塞`,先来考虑下如何才能使一个线程被**阻塞**,看起来的表象线程卡住啥事也做不了。
68+
69+
有几种方案可以实现这个效果:
70+
71+
- `Thread.sleep(timeout)`线程休眠。
72+
- `object.wait()` 让线程进入 `waiting` 状态。
73+
74+
> 当然还有一些 `join、LockSupport.part` 等不在本次的讨论范围。
75+
76+
阻塞队列还有一个非常重要的特性是:当队列空间可用时(取出队列),写入线程需要被唤醒让数据可以写入进去。
77+
78+
所以很明显`Thread.sleep(timeout)`不合适,它在到达超时时间之后便会继续运行;达不到**空间可用时**才唤醒继续运行这个特点。
79+
80+
其实这样的一个特点很容易让我们想到 Java 的等待通知机制来实现线程间通信;更多线程见通信的方案可以参考这里:[深入理解线程通信](https://crossoverjie.top/2018/03/16/java-senior/thread-communication/#%E7%AD%89%E5%BE%85%E9%80%9A%E7%9F%A5%E6%9C%BA%E5%88%B6)
81+
82+
所以我这里的做法是,一旦队列满时就将写入线程调用 `object.wait()` 进入 `waiting` 状态,直到空间可用时再进行唤醒。
83+
84+
```java
85+
/**
86+
* 队列满时的阻塞锁
87+
*/
88+
private Object full = new Object();
89+
90+
/**
91+
* 队列空时的阻塞锁
92+
*/
93+
private Object empty = new Object();
94+
```
95+
96+
![](https://ws2.sinaimg.cn/large/006tNc79ly1g1wf8de8jzj30te0tin1i.jpg)
97+
98+
所以这里声明了两个对象用于队列满、空情况下的互相通知作用。
99+
100+
101+
在写入数据成功后需要使用 `empty.notify()`,这样的目的是当获取队列为空时,一旦写入数据成功就可以把消费队列的线程唤醒。
102+
103+
104+
> 这里的 wait 和 notify 操作都需要对各自的对象使用 `synchronized` 方法块,这是因为 wait 和 notify 都需要获取到各自的锁。
105+
106+
## 消费队列
107+
108+
上文也提到了:当队列为空时,获取队列的线程需要被阻塞,直到队列中有数据时才被唤醒。
109+
110+
![](https://ws1.sinaimg.cn/large/006tNc79ly1g1wfhr3r6qj30tg0tiwit.jpg)
111+
112+
代码和写入的非常类似,也很好理解;只是这里的等待、唤醒恰好是相反的,通过下面这张图可以很好理解:
113+
114+
![](https://ws3.sinaimg.cn/large/006tNc79ly1g1wfwr016gj30o20ksq59.jpg)
115+
116+
总的来说就是:
117+
118+
- 写入队列满时会阻塞直到获取线程消费了队列数据后唤醒**写入线程**
119+
- 消费队列空时会阻塞直到写入线程写入了队列数据后唤醒**消费线程**
120+
121+
122+
## 测试
123+
124+
先来一个基本的测试:单线程的写入和消费。
125+
126+
![](https://ws2.sinaimg.cn/large/006tNc79ly1g1wg97uqgpj30uu0dqwgu.jpg)
127+
128+
```log
129+
3
130+
123
131+
1234
132+
12345
133+
```
134+
135+
通过结果来看没什么问题。
136+
137+
---
138+
139+
当写入的数据超过队列的大小时,就只能消费之后才能接着写入。
140+
141+
![](https://ws3.sinaimg.cn/large/006tNc79ly1g1wgmshqfyj316o0n2ae5.jpg)
142+
143+
```log
144+
2019-04-09 16:24:41.040 [Thread-0] INFO c.c.concurrent.ArrayQueueTest - [Thread-0]123
145+
2019-04-09 16:24:41.040 [main] INFO c.c.concurrent.ArrayQueueTest - size=3
146+
2019-04-09 16:24:41.047 [main] INFO c.c.concurrent.ArrayQueueTest - 1234
147+
2019-04-09 16:24:41.048 [main] INFO c.c.concurrent.ArrayQueueTest - 12345
148+
2019-04-09 16:24:41.048 [main] INFO c.c.concurrent.ArrayQueueTest - 123456
149+
```
150+
151+
从运行结果也能看出只有当消费数据后才能接着往队列里写入数据。
152+
153+
---
154+
155+
![](https://ws3.sinaimg.cn/large/006tNc79ly1g1wiskvki8j30yy0eo0ve.jpg)
156+
157+
![](https://ws2.sinaimg.cn/large/006tNc79ly1g1witm4twpj31q60ai0vz.jpg)
158+
159+
而当没有消费时,再往队列里写数据则会导致写入线程被阻塞。
160+
161+
162+
163+
### 并发测试
164+
165+
![](https://ws3.sinaimg.cn/large/006tNc79ly1g1wiwyz4j5j30vz0u044f.jpg)
166+
167+
三个线程并发写入300条数据,其中一个线程消费一条。
168+
169+
```log
170+
=====0
171+
299
172+
```
173+
174+
最终的队列大小为 299,可见线程也是安全的。
175+
176+
> 由于不管是写入还是获取方法里的操作都需要获取锁才能操作,所以整个队列是线程安全的。
177+
178+
179+
# ArrayBlockingQueue
180+
181+
下面来看看 JDK 标准的 `ArrayBlockingQueue` 的实现,有了上面的基础会更好理解。
182+
183+
## 初始化队列
184+
185+
![](https://ws1.sinaimg.cn/large/006tNc79ly1g1wkaau8w7j30ze0lcagb.jpg)
186+
187+
看似要复杂些,但其实逐步拆分后也很好理解:
188+
189+
第一步其实和我们自己写的一样,初始化一个队列大小的数组。
190+
191+
192+
第二步初始化了一个重入锁,这里其实就和我们之前使用的 `synchronized` 作用一致的;
193+
194+
只是这里在初始化重入锁的时候默认是`非公平锁`,当然也可以指定为 `true` 使用公平锁;这样就会按照队列的顺序进行写入和消费。
195+
196+
> 更多关于 `ReentrantLock` 的使用和原理请参考这里:[ReentrantLock 实现原理](https://crossoverjie.top/2018/01/25/ReentrantLock/)
197+
198+
三四两步则是创建了 `notEmpty notFull` 这两个条件,他的作用于用法和之前使用的 `object.wait/notify` 类似。
199+
200+
这就是整个初始化的内容,其实和我们自己实现的非常类似。
201+
202+
203+
## 写入队列
204+
205+
![](https://ws2.sinaimg.cn/large/006tNc79ly1g1wktuhxzuj30tk0bqq55.jpg)
206+
![](https://ws2.sinaimg.cn/large/006tNc79ly1g1wktfkwu2j30ug09ugnn.jpg)
207+
208+
其实会发现阻塞写入的原理都是差不多的,只是这里使用的是 Lock 来显式获取和释放锁。
209+
210+
同时其中的 `notFull.await();notEmpty.signal();` 和我们之前使用的 `object.wait/notify` 的用法和作用也是一样的。
211+
212+
213+
当然它还是实现了超时阻塞的 `API`
214+
215+
![](https://ws4.sinaimg.cn/large/006tNc79ly1g1wl1n7ir5j30vm0iqdjb.jpg)
216+
217+
也是比较简单,使用了一个具有超时时间的等待方法。
218+
219+
## 消费队列
220+
221+
再看消费队列:
222+
223+
![](https://ws1.sinaimg.cn/large/006tNc79ly1g1wl3vcsioj30tc0ayq4y.jpg)
224+
![](https://ws4.sinaimg.cn/large/006tNc79ly1g1wl4cfrnlj30u00eq0vm.jpg)
225+
226+
也是差不多的,一看就懂。
227+
228+
而其中的超时 API 也是使用了 `notEmpty.awaitNanos(nanos)` 来实现超时返回的,就不具体说了。
229+
230+
231+
# 实际案例
232+
233+
说了这么多,来看一个队列的实际案例吧。
234+
235+
背景是这样的:
236+
237+
> 有一个定时任务会按照一定的间隔时间从数据库中读取一批数据,需要对这些数据做校验同时调用一个远程接口。
238+
239+
240+
简单的做法就是由这个定时任务的线程去完成读取数据、消息校验、调用接口等整个全流程;但这样会有一个问题:
241+
242+
假设调用外部接口出现了异常、网络不稳导致耗时增加就会造成整个任务的效率降低,因为他都是串行会互相影响。
243+
244+
245+
所以我们改进了方案:
246+
247+
![](https://ws1.sinaimg.cn/large/006tNc79ly1g1wm1v7mfxj30qs0aiq4g.jpg)
248+
249+
其实就是一个典型的生产者消费者模型:
250+
251+
- 生产线程从数据库中读取消息丢到队列里。
252+
- 消费线程从队列里获取数据做业务逻辑。
253+
254+
这样两个线程就可以通过这个队列来进行解耦,互相不影响,同时这个队列也能起到缓冲的作用。
255+
256+
但在使用过程中也有一些小细节值得注意。
257+
258+
因为这个外部接口是支持批量执行的,所以在消费线程取出数据后会在内存中做一个累加,一旦达到阈值或者是累计了一个时间段便将这批累计的数据处理掉。
259+
260+
但由于开发者的大意,在消费的时候使用的是 `queue.take()` 这个阻塞的 API;正常运行没啥问题。
261+
262+
可一旦原始的数据源,也就是 DB 中没数据了,导致队列里的数据也被消费完后这个消费线程便会被阻塞。
263+
264+
这样上一轮积累在内存中的数据便一直没机会使用,直到数据源又有数据了,一旦中间间隔较长时便可能会导致严重的业务异常。
265+
266+
所以我们最好是使用 `queue.poll(timeout)` 这样带超时时间的 api,除非业务上有明确的要求需要阻塞。
267+
268+
这个习惯同样适用于其他场景,比如调用 http、rpc 接口等都需要设置合理的超时时间。
269+
270+
# 总结
271+
272+
关于 `ArrayBlockingQueue` 的相关分享便到此结束,接着会继续更新其他并发容器及并发工具。
273+
274+
对本文有任何相关问题都可以留言讨论。
275+
276+
277+
278+
本文涉及到的所有源码:
279+
280+
https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/ArrayQueue.java
281+
282+
283+
**你的点赞与分享是对我最大的支持**

0 commit comments

Comments
 (0)