|
2 | 2 | // (c)2017 MindView LLC: see Copyright.txt |
3 | 3 | // We make no guarantees that this code is fit for any purpose. |
4 | 4 | // Visit http://OnJava8.com for more book information. |
5 | | -import java.util.concurrent.*; |
6 | 5 | import java.util.*; |
| 6 | +import java.util.stream.*; |
| 7 | +import java.util.concurrent.*; |
| 8 | +import java.util.concurrent.atomic.*; |
7 | 9 | import onjava.Nap; |
8 | 10 |
|
9 | | -class PrioritizedTask implements |
10 | | -Runnable, Comparable<PrioritizedTask> { |
11 | | - private SplittableRandom rand = new SplittableRandom(47); |
12 | | - private static int counter = 0; |
13 | | - private final int id = counter++; |
| 11 | +class Prioritized implements Comparable<Prioritized> { |
| 12 | + private static AtomicInteger counter = |
| 13 | + new AtomicInteger(); |
| 14 | + private final int id = counter.getAndAdd(1); |
14 | 15 | private final int priority; |
15 | | - protected static List<PrioritizedTask> sequence = |
16 | | - new ArrayList<>(); |
17 | | - public PrioritizedTask(int priority) { |
| 16 | + private static List<Prioritized> sequence = |
| 17 | + new CopyOnWriteArrayList<>(); |
| 18 | + public Prioritized(int priority) { |
18 | 19 | this.priority = priority; |
19 | 20 | sequence.add(this); |
20 | 21 | } |
21 | 22 | @Override |
22 | | - public int compareTo(PrioritizedTask arg) { |
| 23 | + public int compareTo(Prioritized arg) { |
23 | 24 | return priority < arg.priority ? 1 : |
24 | 25 | (priority > arg.priority ? -1 : 0); |
25 | 26 | } |
26 | 27 | @Override |
27 | | - public void run() { |
28 | | - new Nap(rand.nextInt(250)); |
29 | | - System.out.println(this); |
30 | | - } |
31 | | - @Override |
32 | 28 | public String toString() { |
33 | | - return String.format("[%1$-3d]", priority) + |
34 | | - " Task " + id; |
35 | | - } |
36 | | - public String summary() { |
37 | | - return "(" + id + ":" + priority + ")"; |
| 29 | + return String.format( |
| 30 | + "[%d] Prioritized %d", priority, id); |
38 | 31 | } |
39 | | - public static |
40 | | - class EndSentinel extends PrioritizedTask { |
41 | | - private ExecutorService exec; |
42 | | - public EndSentinel(ExecutorService e) { |
43 | | - super(-1); // Lowest priority in this program |
44 | | - exec = e; |
45 | | - } |
46 | | - @Override |
47 | | - public void run() { |
48 | | - int count = 0; |
49 | | - for(PrioritizedTask pt : sequence) { |
50 | | - System.out.print(pt.summary()); |
51 | | - if(++count % 5 == 0) |
52 | | - System.out.println(); |
53 | | - } |
54 | | - System.out.println(); |
55 | | - System.out.println(this + " Calling shutdownNow()"); |
56 | | - exec.shutdownNow(); |
| 32 | + public void displaySequence() { |
| 33 | + int count = 0; |
| 34 | + for(Prioritized pt : sequence) { |
| 35 | + System.out.printf("(%d:%d)", pt.id, pt.priority); |
| 36 | + if(++count % 5 == 0) |
| 37 | + System.out.println(); |
57 | 38 | } |
58 | 39 | } |
| 40 | + public static class EndSentinel extends Prioritized { |
| 41 | + public EndSentinel() { super(-1); } |
| 42 | + } |
59 | 43 | } |
60 | 44 |
|
61 | | -class PrioritizedTaskProducer implements Runnable { |
62 | | - private SplittableRandom rand = new SplittableRandom(47); |
63 | | - private Queue<Runnable> queue; |
64 | | - private ExecutorService exec; |
65 | | - public PrioritizedTaskProducer( |
66 | | - Queue<Runnable> q, ExecutorService e) { |
| 45 | +class Producer implements Runnable { |
| 46 | + private static AtomicInteger seed = |
| 47 | + new AtomicInteger(47); |
| 48 | + private SplittableRandom rand = |
| 49 | + new SplittableRandom(seed.getAndAdd(10)); |
| 50 | + private Queue<Prioritized> queue; |
| 51 | + public Producer(Queue<Prioritized> q) { |
67 | 52 | queue = q; |
68 | | - exec = e; // Used for EndSentinel |
69 | 53 | } |
70 | 54 | @Override |
71 | 55 | public void run() { |
72 | | - // Unbounded queue; never blocks. |
73 | | - // Fill it up fast with random priorities: |
74 | | - for(int i = 0; i < 20; i++) { |
75 | | - queue.add(new PrioritizedTask(rand.nextInt(10))); |
76 | | - new Nap(10); |
77 | | - } |
78 | | - // Trickle in highest-priority jobs: |
79 | | - for(int i = 0; i < 10; i++) { |
80 | | - new Nap(250); |
81 | | - queue.add(new PrioritizedTask(10)); |
82 | | - } |
83 | | - // Add jobs, lowest priority first: |
84 | | - for(int i = 0; i < 10; i++) |
85 | | - queue.add(new PrioritizedTask(i)); |
86 | | - // A sentinel to stop all the tasks: |
87 | | - queue.add(new PrioritizedTask.EndSentinel(exec)); |
88 | | - System.out.println( |
89 | | - "Finished PrioritizedTaskProducer"); |
| 56 | + rand.ints(10, 0, 20) |
| 57 | + .mapToObj(Prioritized::new) |
| 58 | + .peek(p -> new Nap(rand.nextInt( |
| 59 | + PriorityBlockingQueueDemo.NAPTIME))) |
| 60 | + .forEach(p -> queue.add(p)); |
| 61 | + queue.add(new Prioritized.EndSentinel()); |
90 | 62 | } |
91 | 63 | } |
92 | 64 |
|
93 | | -class PrioritizedTaskConsumer implements Runnable { |
94 | | - private PriorityBlockingQueue<Runnable> q; |
95 | | - public PrioritizedTaskConsumer( |
96 | | - PriorityBlockingQueue<Runnable> q) { |
| 65 | +class Consumer implements Runnable { |
| 66 | + private PriorityBlockingQueue<Prioritized> q; |
| 67 | + private SplittableRandom rand = |
| 68 | + new SplittableRandom(47); |
| 69 | + public |
| 70 | + Consumer(PriorityBlockingQueue<Prioritized> q) { |
97 | 71 | this.q = q; |
98 | 72 | } |
99 | 73 | @Override |
100 | 74 | public void run() { |
101 | | - try { |
102 | | - while(!Thread.interrupted()) |
103 | | - // Use current thread to run the task: |
104 | | - q.take().run(); |
105 | | - } catch(InterruptedException e) { |
106 | | - // Acceptable way to exit |
| 75 | + while(true) { |
| 76 | + try { |
| 77 | + Prioritized pt = q.take(); |
| 78 | + System.out.println(pt); |
| 79 | + if(pt instanceof Prioritized.EndSentinel) { |
| 80 | + pt.displaySequence(); |
| 81 | + break; |
| 82 | + } |
| 83 | + new Nap(rand.nextInt( |
| 84 | + PriorityBlockingQueueDemo.NAPTIME)); |
| 85 | + } catch(InterruptedException e) { |
| 86 | + throw new RuntimeException(e); |
| 87 | + } |
107 | 88 | } |
108 | | - System.out.println( |
109 | | - "Finished PrioritizedTaskConsumer"); |
110 | 89 | } |
111 | 90 | } |
112 | 91 |
|
113 | 92 | public class PriorityBlockingQueueDemo { |
114 | | - public static void |
115 | | - main(String[] args) throws Exception { |
116 | | - ExecutorService es = Executors.newCachedThreadPool(); |
117 | | - PriorityBlockingQueue<Runnable> queue = |
| 93 | + static final int NAPTIME = 50; |
| 94 | + public static void main(String[] args) { |
| 95 | + PriorityBlockingQueue<Prioritized> queue = |
118 | 96 | new PriorityBlockingQueue<>(); |
119 | | - es.execute(new PrioritizedTaskProducer(queue, es)); |
120 | | - es.execute(new PrioritizedTaskConsumer(queue)); |
| 97 | + CompletableFuture.runAsync(new Producer(queue)); |
| 98 | + CompletableFuture.runAsync(new Producer(queue)); |
| 99 | + CompletableFuture.runAsync(new Producer(queue)); |
| 100 | + CompletableFuture.runAsync(new Consumer(queue)) |
| 101 | + .join(); |
121 | 102 | } |
122 | 103 | } |
123 | | -/* Output: (First and Last 12 Lines) |
124 | | -[9 ] Task 5 |
125 | | -[8 ] Task 7 |
126 | | -[10 ] Task 20 |
127 | | -[8 ] Task 8 |
128 | | -[10 ] Task 21 |
129 | | -[7 ] Task 4 |
130 | | -[10 ] Task 22 |
131 | | -[7 ] Task 19 |
132 | | -[10 ] Task 23 |
133 | | -[7 ] Task 11 |
134 | | -[10 ] Task 24 |
135 | | -[7 ] Task 1 |
136 | | -...________...________...________...________... |
137 | | -[0 ] Task 16 |
138 | | -(0:5)(1:7)(2:1)(3:0)(4:7) |
139 | | -(5:9)(6:6)(7:8)(8:8)(9:3) |
140 | | -(10:0)(11:7)(12:0)(13:5)(14:2) |
141 | | -(15:4)(16:0)(17:2)(18:1)(19:7) |
142 | | -(20:10)(21:10)(22:10)(23:10)(24:10) |
143 | | -(25:10)(26:10)(27:10)(28:10)(29:10) |
144 | | -(30:0)(31:1)(32:2)(33:3)(34:4) |
145 | | -(35:5)(36:6)(37:7)(38:8)(39:9) |
146 | | -(40:-1) |
147 | | -[-1 ] Task 40 Calling shutdownNow() |
148 | | -Finished PrioritizedTaskConsumer |
| 104 | +/* Output: |
| 105 | +[12] Prioritized 1 |
| 106 | +[17] Prioritized 2 |
| 107 | +[15] Prioritized 0 |
| 108 | +[18] Prioritized 17 |
| 109 | +[17] Prioritized 10 |
| 110 | +[16] Prioritized 20 |
| 111 | +[16] Prioritized 16 |
| 112 | +[15] Prioritized 15 |
| 113 | +[14] Prioritized 8 |
| 114 | +[14] Prioritized 13 |
| 115 | +[13] Prioritized 12 |
| 116 | +[12] Prioritized 19 |
| 117 | +[12] Prioritized 14 |
| 118 | +[11] Prioritized 6 |
| 119 | +[11] Prioritized 22 |
| 120 | +[11] Prioritized 4 |
| 121 | +[11] Prioritized 31 |
| 122 | +[10] Prioritized 30 |
| 123 | +[10] Prioritized 26 |
| 124 | +[8] Prioritized 18 |
| 125 | +[8] Prioritized 23 |
| 126 | +[8] Prioritized 9 |
| 127 | +[6] Prioritized 24 |
| 128 | +[3] Prioritized 3 |
| 129 | +[2] Prioritized 29 |
| 130 | +[1] Prioritized 7 |
| 131 | +[0] Prioritized 27 |
| 132 | +[0] Prioritized 5 |
| 133 | +[0] Prioritized 21 |
| 134 | +[0] Prioritized 11 |
| 135 | +[-1] Prioritized 25 |
| 136 | +(0:15)(2:17)(1:12)(3:3)(4:11) |
| 137 | +(5:0)(6:11)(7:1)(8:14)(9:8) |
| 138 | +(10:17)(11:0)(12:13)(13:14)(14:12) |
| 139 | +(15:15)(16:16)(17:18)(18:8)(19:12) |
| 140 | +(20:16)(21:0)(22:11)(23:8)(24:6) |
| 141 | +(25:-1)(26:10)(27:0)(28:-1)(29:2) |
| 142 | +(30:10)(31:11)(32:-1) |
149 | 143 | */ |
0 commit comments