Skip to content

Commit 6466abc

Browse files
committed
🔖 并发工具类示例
1 parent ca16013 commit 6466abc

5 files changed

Lines changed: 107 additions & 58 deletions

File tree

codes/concurrent/src/main/java/io/github/dunwu/javase/concurrent/tool/CountDownLatchDemo.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44

55
/**
66
* CountDownLatch 示例
7-
* CountDownLatch 可以实现类似计数器的功能。
8-
*
7+
* <p>
8+
* 作用:允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
9+
* <p>
10+
* 原理:CountDownLatch 维护一个计数器 count。每次调用 countDown 方法会让 count 的值减 1,减到 0 的时候,那些因为调用 await
11+
* 方法而在等待的线程就会被唤醒。
912
* @author Zhang Peng
1013
* @date 2018/5/10
1114
* @see java.util.concurrent.CountDownLatch
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package io.github.dunwu.javase.concurrent.tool;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
5+
/**
6+
* @author Zhang Peng
7+
* @date 2018/5/15
8+
*/
9+
public class CountDownLatchDemo02 {
10+
public static void main(String[] args) throws InterruptedException {
11+
Runnable task = new ThreadName();
12+
long time = timeTasks(3, task);
13+
System.out.println(time);
14+
}
15+
16+
private static long timeTasks(int num, final Runnable task)
17+
throws InterruptedException {
18+
final CountDownLatch startGate = new CountDownLatch(1);
19+
final CountDownLatch endGate = new CountDownLatch(num);
20+
21+
for (int i = 0; i < num; i++) {
22+
Thread t = new Thread(() -> {
23+
try {
24+
startGate.await();
25+
try {
26+
task.run();
27+
} finally {
28+
endGate.countDown();
29+
}
30+
} catch (InterruptedException ignored) {
31+
}
32+
});
33+
t.start();
34+
}
35+
36+
long start = System.currentTimeMillis();
37+
startGate.countDown();
38+
endGate.await();
39+
long end = System.currentTimeMillis();
40+
return end - start;
41+
}
42+
43+
static class ThreadName implements Runnable {
44+
45+
@Override
46+
public void run() {
47+
for (int i = 1; i <= 5; i++) {
48+
System.out.println(Thread.currentThread().getName() + "运行,i = " + i);
49+
}
50+
}
51+
}
52+
53+
;
54+
}

codes/concurrent/src/main/java/io/github/dunwu/javase/concurrent/tool/CyclicBarrierDemo.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
* CyclicBarrier 示例
88
* 字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
99
* 叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
10-
*
1110
* @author Zhang Peng
1211
* @date 2018/5/10
1312
* @see java.util.concurrent.CyclicBarrier

codes/concurrent/src/main/java/io/github/dunwu/javase/concurrent/tool/CyclicBarrierDemo02.java

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,52 +9,52 @@
99
* CyclicBarrier 示例
1010
* 字面意思回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
1111
* 叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。
12-
*
1312
* @author Zhang Peng
1413
* @date 2018/5/10
1514
* @see CyclicBarrier
1615
*/
1716
public class CyclicBarrierDemo02 {
1817

19-
public static void main(String[] args) {
20-
int N = 4;
21-
CyclicBarrier barrier = new CyclicBarrier(N);
22-
23-
for (int i = 0; i < N; i++) {
24-
if (i < N - 1) { new Writer(barrier).start(); } else {
25-
try {
26-
Thread.sleep(5000);
27-
} catch (InterruptedException e) {
28-
e.printStackTrace();
29-
}
30-
new Writer(barrier).start();
31-
}
32-
}
33-
}
18+
static class CyclicBarrierRunnable implements Runnable {
3419

35-
static class Writer extends Thread {
20+
CyclicBarrier barrier1 = null;
21+
CyclicBarrier barrier2 = null;
3622

37-
private CyclicBarrier cyclicBarrier;
38-
39-
Writer(CyclicBarrier cyclicBarrier) {
40-
this.cyclicBarrier = cyclicBarrier;
23+
CyclicBarrierRunnable(CyclicBarrier barrier1, CyclicBarrier barrier2) {
24+
this.barrier1 = barrier1;
25+
this.barrier2 = barrier2;
4126
}
4227

43-
@Override
4428
public void run() {
45-
System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
4629
try {
47-
Thread.sleep(5000); //以睡眠来模拟写入数据操作
48-
System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
49-
try {
50-
cyclicBarrier.await(2000, TimeUnit.MILLISECONDS);
51-
} catch (TimeoutException e) {
52-
e.printStackTrace();
53-
}
30+
Thread.sleep(1000);
31+
System.out.println(Thread.currentThread().getName() + " waiting at barrier 1");
32+
this.barrier1.await();
33+
34+
Thread.sleep(1000);
35+
System.out.println(Thread.currentThread().getName() + " waiting at barrier 2");
36+
this.barrier2.await();
37+
38+
System.out.println(Thread.currentThread().getName() + " done!");
39+
5440
} catch (InterruptedException | BrokenBarrierException e) {
5541
e.printStackTrace();
5642
}
57-
System.out.println(Thread.currentThread().getName() + "所有线程写入完毕,继续处理其他任务...");
5843
}
5944
}
45+
46+
public static void main(String[] args) {
47+
Runnable barrier1Action = () -> System.out.println("BarrierAction 1 executed ");
48+
Runnable barrier2Action = () -> System.out.println("BarrierAction 2 executed ");
49+
50+
CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action);
51+
CyclicBarrier barrier2 = new CyclicBarrier(2, barrier2Action);
52+
53+
CyclicBarrierRunnable barrierRunnable1 = new CyclicBarrierRunnable(barrier1, barrier2);
54+
55+
CyclicBarrierRunnable barrierRunnable2 = new CyclicBarrierRunnable(barrier1, barrier2);
56+
57+
new Thread(barrierRunnable1).start();
58+
new Thread(barrierRunnable2).start();
59+
}
6060
}
Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,38 @@
11
package io.github.dunwu.javase.concurrent.tool;
22

3+
import java.util.concurrent.ExecutorService;
4+
import java.util.concurrent.Executors;
35
import java.util.concurrent.Semaphore;
46

57
/**
68
* Semaphore 示例
79
* 字面意思为信号量,Semaphore 可以控同时访问的线程个数,通过 acquire() 获取一个许可,
810
* 如果没有就等待,而 release() 释放一个许可。
9-
*
1011
* @author Zhang Peng
1112
* @date 2018/5/10
1213
* @see java.util.concurrent.Semaphore
1314
*/
1415
public class SemaphoreDemo {
1516

16-
public static void main(String[] args) {
17-
int N = 8; //工人数
18-
Semaphore semaphore = new Semaphore(5); //机器数目
19-
for (int i = 0; i < N; i++) { new Worker(i, semaphore).start(); }
20-
}
17+
private static final int THREAD_COUNT = 30;
2118

22-
static class Worker extends Thread {
19+
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
2320

24-
private int num;
25-
private Semaphore semaphore;
21+
private static Semaphore s = new Semaphore(10);
2622

27-
Worker(int num, Semaphore semaphore) {
28-
this.num = num;
29-
this.semaphore = semaphore;
23+
public static void main(String[] args) {
24+
for (int i = 0; i < THREAD_COUNT; i++) {
25+
threadPool.execute(() -> {
26+
try {
27+
s.acquire();
28+
System.out.println("save data");
29+
s.release();
30+
} catch (InterruptedException e) {
31+
e.printStackTrace();
32+
}
33+
});
3034
}
3135

32-
@Override
33-
public void run() {
34-
try {
35-
semaphore.acquire(); // 获取许可
36-
System.out.println("工人" + this.num + "占用一个机器在生产...");
37-
Thread.sleep(2000);
38-
System.out.println("工人" + this.num + "释放出机器");
39-
semaphore.release(); // 释放许可
40-
} catch (InterruptedException e) {
41-
e.printStackTrace();
42-
}
43-
}
36+
threadPool.shutdown();
4437
}
4538
}

0 commit comments

Comments
 (0)