|
| 1 | + |
| 2 | + |
| 3 | + |
| 4 | + |
| 5 | +# 前言 |
| 6 | + |
| 7 | +较长一段时间以来我都发现不少开发者对 jdk 中的 `J.U.C`(java.util.concurrent)也就是 Java 并发包的使用甚少,更别谈对它的理解了;但这却也是我们进阶的必备关卡。 |
| 8 | + |
| 9 | +之前或多或少也分享过相关内容,但都不成体系;于是便想整理一套与并发包相关的系列文章。 |
| 10 | + |
| 11 | +其中的内容主要包含以下几个部分: |
| 12 | + |
| 13 | +- 根据定义自己实现一个并发工具。 |
| 14 | +- JDK 的标准实现。 |
| 15 | +- 实践案例。 |
| 16 | + |
| 17 | + |
| 18 | +基于这三点我相信大家对这部分内容不至于一问三不知。 |
| 19 | + |
| 20 | +既然开了一个新坑,就不想做的太差;所以我打算将这个列表下的大部分类都讲到。 |
| 21 | + |
| 22 | + |
| 23 | + |
| 24 | + |
| 25 | +所以本次重点讨论 `ArrayBlockingQueue`。 |
| 26 | + |
| 27 | +# 自己实现 |
| 28 | + |
| 29 | +在自己实现之前先搞清楚阻塞队列的几个特点: |
| 30 | + |
| 31 | +- 基本队列特性:先进先出。 |
| 32 | +- 写入队列空间不可用时会阻塞。 |
| 33 | +- 获取队列数据时当队列为空时将阻塞。 |
| 34 | + |
| 35 | + |
| 36 | +实现队列的方式多种,总的来说就是数组和链表;其实我们只需要搞清楚其中一个即可,不同的特性主要表现为数组和链表的区别。 |
| 37 | + |
| 38 | +这里的 `ArrayBlockingQueue` 看名字很明显是由数组实现。 |
| 39 | + |
| 40 | +我们先根据它这三个特性尝试自己实现试试。 |
| 41 | + |
| 42 | +## 初始化队列 |
| 43 | + |
| 44 | +我这里自定义了一个类:`ArrayQueue`,它的构造函数如下: |
| 45 | + |
| 46 | +```java |
| 47 | + public ArrayQueue(int size) { |
| 48 | + items = new Object[size]; |
| 49 | + } |
| 50 | +``` |
| 51 | + |
| 52 | +很明显这里的 `items` 就是存放数据的数组;在初始化时需要根据大小创建数组。 |
| 53 | + |
| 54 | + |
| 55 | + |
| 56 | +## 写入队列 |
| 57 | + |
| 58 | +写入队列比较简单,只需要依次把数据存放到这个数组中即可,如下图: |
| 59 | + |
| 60 | + |
| 61 | + |
| 62 | +但还是有几个需要注意的点: |
| 63 | + |
| 64 | +- 队列满的时候,写入的线程需要被阻塞。 |
| 65 | +- 写入过队列的数量大于队列大小时需要从第一个下标开始写。 |
| 66 | + |
| 67 | +先看第一个`队列满的时候,写入的线程需要被阻塞`,先来考虑下如何才能使一个线程被**阻塞**,看起来的表象线程卡住啥事也做不了。 |
| 68 | + |
| 69 | +有几种方案可以实现这个效果: |
| 70 | + |
| 71 | +- `Thread.sleep(timeout)`线程休眠。 |
| 72 | +- `object.wait()` 让线程进入 `waiting` 状态。 |
| 73 | + |
| 74 | +> 当然还有一些 `join、LockSupport.part` 等不在本次的讨论范围。 |
| 75 | +
|
| 76 | +阻塞队列还有一个非常重要的特性是:当队列空间可用时(取出队列),写入线程需要被唤醒让数据可以写入进去。 |
| 77 | + |
| 78 | +所以很明显`Thread.sleep(timeout)`不合适,它在到达超时时间之后便会继续运行;达不到**空间可用时**才唤醒继续运行这个特点。 |
| 79 | + |
| 80 | +其实这样的一个特点很容易让我们想到 Java 的等待通知机制来实现线程间通信;更多线程见通信的方案可以参考这里:[深入理解线程通信](https://crossoverjie.top/2018/03/16/java-senior/thread-communication/#%E7%AD%89%E5%BE%85%E9%80%9A%E7%9F%A5%E6%9C%BA%E5%88%B6) |
| 81 | + |
| 82 | +所以我这里的做法是,一旦队列满时就将写入线程调用 `object.wait()` 进入 `waiting` 状态,直到空间可用时再进行唤醒。 |
| 83 | + |
| 84 | +```java |
| 85 | + /** |
| 86 | + * 队列满时的阻塞锁 |
| 87 | + */ |
| 88 | + private Object full = new Object(); |
| 89 | + |
| 90 | + /** |
| 91 | + * 队列空时的阻塞锁 |
| 92 | + */ |
| 93 | + private Object empty = new Object(); |
| 94 | +``` |
| 95 | + |
| 96 | + |
| 97 | + |
| 98 | +所以这里声明了两个对象用于队列满、空情况下的互相通知作用。 |
| 99 | + |
| 100 | + |
| 101 | +在写入数据成功后需要使用 `empty.notify()`,这样的目的是当获取队列为空时,一旦写入数据成功就可以把消费队列的线程唤醒。 |
| 102 | + |
| 103 | + |
| 104 | +> 这里的 wait 和 notify 操作都需要对各自的对象使用 `synchronized` 方法块,这是因为 wait 和 notify 都需要获取到各自的锁。 |
| 105 | +
|
| 106 | +## 消费队列 |
| 107 | + |
| 108 | +上文也提到了:当队列为空时,获取队列的线程需要被阻塞,直到队列中有数据时才被唤醒。 |
| 109 | + |
| 110 | + |
| 111 | + |
| 112 | +代码和写入的非常类似,也很好理解;只是这里的等待、唤醒恰好是相反的,通过下面这张图可以很好理解: |
| 113 | + |
| 114 | + |
| 115 | + |
| 116 | +总的来说就是: |
| 117 | + |
| 118 | +- 写入队列满时会阻塞直到获取线程消费了队列数据后唤醒**写入线程**。 |
| 119 | +- 消费队列空时会阻塞直到写入线程写入了队列数据后唤醒**消费线程**。 |
| 120 | + |
| 121 | + |
| 122 | +## 测试 |
| 123 | + |
| 124 | +先来一个基本的测试:单线程的写入和消费。 |
| 125 | + |
| 126 | + |
| 127 | + |
| 128 | +```log |
| 129 | +3 |
| 130 | +123 |
| 131 | +1234 |
| 132 | +12345 |
| 133 | +``` |
| 134 | + |
| 135 | +通过结果来看没什么问题。 |
| 136 | + |
| 137 | +--- |
| 138 | + |
| 139 | +当写入的数据超过队列的大小时,就只能消费之后才能接着写入。 |
| 140 | + |
| 141 | + |
| 142 | + |
| 143 | +```log |
| 144 | +2019-04-09 16:24:41.040 [Thread-0] INFO c.c.concurrent.ArrayQueueTest - [Thread-0]123 |
| 145 | +2019-04-09 16:24:41.040 [main] INFO c.c.concurrent.ArrayQueueTest - size=3 |
| 146 | +2019-04-09 16:24:41.047 [main] INFO c.c.concurrent.ArrayQueueTest - 1234 |
| 147 | +2019-04-09 16:24:41.048 [main] INFO c.c.concurrent.ArrayQueueTest - 12345 |
| 148 | +2019-04-09 16:24:41.048 [main] INFO c.c.concurrent.ArrayQueueTest - 123456 |
| 149 | +``` |
| 150 | + |
| 151 | +从运行结果也能看出只有当消费数据后才能接着往队列里写入数据。 |
| 152 | + |
| 153 | +--- |
| 154 | + |
| 155 | + |
| 156 | + |
| 157 | + |
| 158 | + |
| 159 | +而当没有消费时,再往队列里写数据则会导致写入线程被阻塞。 |
| 160 | + |
| 161 | + |
| 162 | + |
| 163 | +### 并发测试 |
| 164 | + |
| 165 | + |
| 166 | + |
| 167 | +三个线程并发写入300条数据,其中一个线程消费一条。 |
| 168 | + |
| 169 | +```log |
| 170 | +=====0 |
| 171 | +299 |
| 172 | +``` |
| 173 | + |
| 174 | +最终的队列大小为 299,可见线程也是安全的。 |
| 175 | + |
| 176 | +> 由于不管是写入还是获取方法里的操作都需要获取锁才能操作,所以整个队列是线程安全的。 |
| 177 | +
|
| 178 | + |
| 179 | +# ArrayBlockingQueue |
| 180 | + |
| 181 | +下面来看看 JDK 标准的 `ArrayBlockingQueue` 的实现,有了上面的基础会更好理解。 |
| 182 | + |
| 183 | +## 初始化队列 |
| 184 | + |
| 185 | + |
| 186 | + |
| 187 | +看似要复杂些,但其实逐步拆分后也很好理解: |
| 188 | + |
| 189 | +第一步其实和我们自己写的一样,初始化一个队列大小的数组。 |
| 190 | + |
| 191 | + |
| 192 | +第二步初始化了一个重入锁,这里其实就和我们之前使用的 `synchronized` 作用一致的; |
| 193 | + |
| 194 | +只是这里在初始化重入锁的时候默认是`非公平锁`,当然也可以指定为 `true` 使用公平锁;这样就会按照队列的顺序进行写入和消费。 |
| 195 | + |
| 196 | +> 更多关于 `ReentrantLock` 的使用和原理请参考这里:[ReentrantLock 实现原理](https://crossoverjie.top/2018/01/25/ReentrantLock/) |
| 197 | +
|
| 198 | +三四两步则是创建了 `notEmpty notFull` 这两个条件,他的作用于用法和之前使用的 `object.wait/notify` 类似。 |
| 199 | + |
| 200 | +这就是整个初始化的内容,其实和我们自己实现的非常类似。 |
| 201 | + |
| 202 | + |
| 203 | +## 写入队列 |
| 204 | + |
| 205 | + |
| 206 | + |
| 207 | + |
| 208 | +其实会发现阻塞写入的原理都是差不多的,只是这里使用的是 Lock 来显式获取和释放锁。 |
| 209 | + |
| 210 | +同时其中的 `notFull.await();notEmpty.signal();` 和我们之前使用的 `object.wait/notify` 的用法和作用也是一样的。 |
| 211 | + |
| 212 | + |
| 213 | +当然它还是实现了超时阻塞的 `API`。 |
| 214 | + |
| 215 | + |
| 216 | + |
| 217 | +也是比较简单,使用了一个具有超时时间的等待方法。 |
| 218 | + |
| 219 | +## 消费队列 |
| 220 | + |
| 221 | +再看消费队列: |
| 222 | + |
| 223 | + |
| 224 | + |
| 225 | + |
| 226 | +也是差不多的,一看就懂。 |
| 227 | + |
| 228 | +而其中的超时 API 也是使用了 `notEmpty.awaitNanos(nanos)` 来实现超时返回的,就不具体说了。 |
| 229 | + |
| 230 | + |
| 231 | +# 实际案例 |
| 232 | + |
| 233 | +说了这么多,来看一个队列的实际案例吧。 |
| 234 | + |
| 235 | +背景是这样的: |
| 236 | + |
| 237 | +> 有一个定时任务会按照一定的间隔时间从数据库中读取一批数据,需要对这些数据做校验同时调用一个远程接口。 |
| 238 | +
|
| 239 | + |
| 240 | +简单的做法就是由这个定时任务的线程去完成读取数据、消息校验、调用接口等整个全流程;但这样会有一个问题: |
| 241 | + |
| 242 | +假设调用外部接口出现了异常、网络不稳导致耗时增加就会造成整个任务的效率降低,因为他都是串行会互相影响。 |
| 243 | + |
| 244 | + |
| 245 | +所以我们改进了方案: |
| 246 | + |
| 247 | + |
| 248 | + |
| 249 | +其实就是一个典型的生产者消费者模型: |
| 250 | + |
| 251 | +- 生产线程从数据库中读取消息丢到队列里。 |
| 252 | +- 消费线程从队列里获取数据做业务逻辑。 |
| 253 | + |
| 254 | +这样两个线程就可以通过这个队列来进行解耦,互相不影响,同时这个队列也能起到缓冲的作用。 |
| 255 | + |
| 256 | +但在使用过程中也有一些小细节值得注意。 |
| 257 | + |
| 258 | +因为这个外部接口是支持批量执行的,所以在消费线程取出数据后会在内存中做一个累加,一旦达到阈值或者是累计了一个时间段便将这批累计的数据处理掉。 |
| 259 | + |
| 260 | +但由于开发者的大意,在消费的时候使用的是 `queue.take()` 这个阻塞的 API;正常运行没啥问题。 |
| 261 | + |
| 262 | +可一旦原始的数据源,也就是 DB 中没数据了,导致队列里的数据也被消费完后这个消费线程便会被阻塞。 |
| 263 | + |
| 264 | +这样上一轮积累在内存中的数据便一直没机会使用,直到数据源又有数据了,一旦中间间隔较长时便可能会导致严重的业务异常。 |
| 265 | + |
| 266 | +所以我们最好是使用 `queue.poll(timeout)` 这样带超时时间的 api,除非业务上有明确的要求需要阻塞。 |
| 267 | + |
| 268 | +这个习惯同样适用于其他场景,比如调用 http、rpc 接口等都需要设置合理的超时时间。 |
| 269 | + |
| 270 | +# 总结 |
| 271 | + |
| 272 | +关于 `ArrayBlockingQueue` 的相关分享便到此结束,接着会继续更新其他并发容器及并发工具。 |
| 273 | + |
| 274 | +对本文有任何相关问题都可以留言讨论。 |
| 275 | + |
| 276 | + |
| 277 | + |
| 278 | +本文涉及到的所有源码: |
| 279 | + |
| 280 | +https://github.com/crossoverJie/JCSprout/blob/master/src/main/java/com/crossoverjie/concurrent/ArrayQueue.java |
| 281 | + |
| 282 | + |
| 283 | +**你的点赞与分享是对我最大的支持** |
0 commit comments