Skip to content

Commit fac3233

Browse files
committed
Completely refactored
1 parent 3e6025f commit fac3233

File tree

2 files changed

+104
-113
lines changed

2 files changed

+104
-113
lines changed

lowlevel/DelayQueueDemo.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,11 @@ public class DelayQueueDemo {
6363
new Random(47).ints(20, 0, 4000)
6464
.mapToObj(DelayedTask::new),
6565
// Add the summarizing task:
66-
Stream.of(
67-
new DelayedTask.EndTask(4000)))
66+
Stream.of(new DelayedTask.EndTask(4000)))
6867
.collect(Collectors
6968
.toCollection(DelayQueue::new));
70-
DelayQueue<DelayedTask> delayQueue =
71-
new DelayQueue<>(tasks);
72-
while(delayQueue.size() > 0)
73-
delayQueue.take().run();
69+
while(tasks.size() > 0)
70+
tasks.take().run();
7471
}
7572
}
7673
/* Output:

lowlevel/PriorityBlockingQueueDemo.java

Lines changed: 101 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -2,148 +2,142 @@
22
// (c)2017 MindView LLC: see Copyright.txt
33
// We make no guarantees that this code is fit for any purpose.
44
// Visit http://OnJava8.com for more book information.
5-
import java.util.concurrent.*;
65
import java.util.*;
6+
import java.util.stream.*;
7+
import java.util.concurrent.*;
8+
import java.util.concurrent.atomic.*;
79
import onjava.Nap;
810

9-
class PrioritizedTask implements
10-
Runnable, Comparable<PrioritizedTask> {
11-
private SplittableRandom rand = new SplittableRandom(47);
12-
private static int counter = 0;
13-
private final int id = counter++;
11+
class Prioritized implements Comparable<Prioritized> {
12+
private static AtomicInteger counter =
13+
new AtomicInteger();
14+
private final int id = counter.getAndAdd(1);
1415
private final int priority;
15-
protected static List<PrioritizedTask> sequence =
16-
new ArrayList<>();
17-
public PrioritizedTask(int priority) {
16+
private static List<Prioritized> sequence =
17+
new CopyOnWriteArrayList<>();
18+
public Prioritized(int priority) {
1819
this.priority = priority;
1920
sequence.add(this);
2021
}
2122
@Override
22-
public int compareTo(PrioritizedTask arg) {
23+
public int compareTo(Prioritized arg) {
2324
return priority < arg.priority ? 1 :
2425
(priority > arg.priority ? -1 : 0);
2526
}
2627
@Override
27-
public void run() {
28-
new Nap(rand.nextInt(250));
29-
System.out.println(this);
30-
}
31-
@Override
3228
public String toString() {
33-
return String.format("[%1$-3d]", priority) +
34-
" Task " + id;
35-
}
36-
public String summary() {
37-
return "(" + id + ":" + priority + ")";
29+
return String.format(
30+
"[%d] Prioritized %d", priority, id);
3831
}
39-
public static
40-
class EndSentinel extends PrioritizedTask {
41-
private ExecutorService exec;
42-
public EndSentinel(ExecutorService e) {
43-
super(-1); // Lowest priority in this program
44-
exec = e;
45-
}
46-
@Override
47-
public void run() {
48-
int count = 0;
49-
for(PrioritizedTask pt : sequence) {
50-
System.out.print(pt.summary());
51-
if(++count % 5 == 0)
52-
System.out.println();
53-
}
54-
System.out.println();
55-
System.out.println(this + " Calling shutdownNow()");
56-
exec.shutdownNow();
32+
public void displaySequence() {
33+
int count = 0;
34+
for(Prioritized pt : sequence) {
35+
System.out.printf("(%d:%d)", pt.id, pt.priority);
36+
if(++count % 5 == 0)
37+
System.out.println();
5738
}
5839
}
40+
public static class EndSentinel extends Prioritized {
41+
public EndSentinel() { super(-1); }
42+
}
5943
}
6044

61-
class PrioritizedTaskProducer implements Runnable {
62-
private SplittableRandom rand = new SplittableRandom(47);
63-
private Queue<Runnable> queue;
64-
private ExecutorService exec;
65-
public PrioritizedTaskProducer(
66-
Queue<Runnable> q, ExecutorService e) {
45+
class Producer implements Runnable {
46+
private static AtomicInteger seed =
47+
new AtomicInteger(47);
48+
private SplittableRandom rand =
49+
new SplittableRandom(seed.getAndAdd(10));
50+
private Queue<Prioritized> queue;
51+
public Producer(Queue<Prioritized> q) {
6752
queue = q;
68-
exec = e; // Used for EndSentinel
6953
}
7054
@Override
7155
public void run() {
72-
// Unbounded queue; never blocks.
73-
// Fill it up fast with random priorities:
74-
for(int i = 0; i < 20; i++) {
75-
queue.add(new PrioritizedTask(rand.nextInt(10)));
76-
new Nap(10);
77-
}
78-
// Trickle in highest-priority jobs:
79-
for(int i = 0; i < 10; i++) {
80-
new Nap(250);
81-
queue.add(new PrioritizedTask(10));
82-
}
83-
// Add jobs, lowest priority first:
84-
for(int i = 0; i < 10; i++)
85-
queue.add(new PrioritizedTask(i));
86-
// A sentinel to stop all the tasks:
87-
queue.add(new PrioritizedTask.EndSentinel(exec));
88-
System.out.println(
89-
"Finished PrioritizedTaskProducer");
56+
rand.ints(10, 0, 20)
57+
.mapToObj(Prioritized::new)
58+
.peek(p -> new Nap(rand.nextInt(
59+
PriorityBlockingQueueDemo.NAPTIME)))
60+
.forEach(p -> queue.add(p));
61+
queue.add(new Prioritized.EndSentinel());
9062
}
9163
}
9264

93-
class PrioritizedTaskConsumer implements Runnable {
94-
private PriorityBlockingQueue<Runnable> q;
95-
public PrioritizedTaskConsumer(
96-
PriorityBlockingQueue<Runnable> q) {
65+
class Consumer implements Runnable {
66+
private PriorityBlockingQueue<Prioritized> q;
67+
private SplittableRandom rand =
68+
new SplittableRandom(47);
69+
public
70+
Consumer(PriorityBlockingQueue<Prioritized> q) {
9771
this.q = q;
9872
}
9973
@Override
10074
public void run() {
101-
try {
102-
while(!Thread.interrupted())
103-
// Use current thread to run the task:
104-
q.take().run();
105-
} catch(InterruptedException e) {
106-
// Acceptable way to exit
75+
while(true) {
76+
try {
77+
Prioritized pt = q.take();
78+
System.out.println(pt);
79+
if(pt instanceof Prioritized.EndSentinel) {
80+
pt.displaySequence();
81+
break;
82+
}
83+
new Nap(rand.nextInt(
84+
PriorityBlockingQueueDemo.NAPTIME));
85+
} catch(InterruptedException e) {
86+
throw new RuntimeException(e);
87+
}
10788
}
108-
System.out.println(
109-
"Finished PrioritizedTaskConsumer");
11089
}
11190
}
11291

11392
public class PriorityBlockingQueueDemo {
114-
public static void
115-
main(String[] args) throws Exception {
116-
ExecutorService es = Executors.newCachedThreadPool();
117-
PriorityBlockingQueue<Runnable> queue =
93+
static final int NAPTIME = 50;
94+
public static void main(String[] args) {
95+
PriorityBlockingQueue<Prioritized> queue =
11896
new PriorityBlockingQueue<>();
119-
es.execute(new PrioritizedTaskProducer(queue, es));
120-
es.execute(new PrioritizedTaskConsumer(queue));
97+
CompletableFuture.runAsync(new Producer(queue));
98+
CompletableFuture.runAsync(new Producer(queue));
99+
CompletableFuture.runAsync(new Producer(queue));
100+
CompletableFuture.runAsync(new Consumer(queue))
101+
.join();
121102
}
122103
}
123-
/* Output: (First and Last 12 Lines)
124-
[9 ] Task 5
125-
[8 ] Task 7
126-
[10 ] Task 20
127-
[8 ] Task 8
128-
[10 ] Task 21
129-
[7 ] Task 4
130-
[10 ] Task 22
131-
[7 ] Task 19
132-
[10 ] Task 23
133-
[7 ] Task 11
134-
[10 ] Task 24
135-
[7 ] Task 1
136-
...________...________...________...________...
137-
[0 ] Task 16
138-
(0:5)(1:7)(2:1)(3:0)(4:7)
139-
(5:9)(6:6)(7:8)(8:8)(9:3)
140-
(10:0)(11:7)(12:0)(13:5)(14:2)
141-
(15:4)(16:0)(17:2)(18:1)(19:7)
142-
(20:10)(21:10)(22:10)(23:10)(24:10)
143-
(25:10)(26:10)(27:10)(28:10)(29:10)
144-
(30:0)(31:1)(32:2)(33:3)(34:4)
145-
(35:5)(36:6)(37:7)(38:8)(39:9)
146-
(40:-1)
147-
[-1 ] Task 40 Calling shutdownNow()
148-
Finished PrioritizedTaskConsumer
104+
/* Output:
105+
[12] Prioritized 1
106+
[17] Prioritized 2
107+
[15] Prioritized 0
108+
[18] Prioritized 17
109+
[17] Prioritized 10
110+
[16] Prioritized 20
111+
[16] Prioritized 16
112+
[15] Prioritized 15
113+
[14] Prioritized 8
114+
[14] Prioritized 13
115+
[13] Prioritized 12
116+
[12] Prioritized 19
117+
[12] Prioritized 14
118+
[11] Prioritized 6
119+
[11] Prioritized 22
120+
[11] Prioritized 4
121+
[11] Prioritized 31
122+
[10] Prioritized 30
123+
[10] Prioritized 26
124+
[8] Prioritized 18
125+
[8] Prioritized 23
126+
[8] Prioritized 9
127+
[6] Prioritized 24
128+
[3] Prioritized 3
129+
[2] Prioritized 29
130+
[1] Prioritized 7
131+
[0] Prioritized 27
132+
[0] Prioritized 5
133+
[0] Prioritized 21
134+
[0] Prioritized 11
135+
[-1] Prioritized 25
136+
(0:15)(2:17)(1:12)(3:3)(4:11)
137+
(5:0)(6:11)(7:1)(8:14)(9:8)
138+
(10:17)(11:0)(12:13)(13:14)(14:12)
139+
(15:15)(16:16)(17:18)(18:8)(19:12)
140+
(20:16)(21:0)(22:11)(23:8)(24:6)
141+
(25:-1)(26:10)(27:0)(28:-1)(29:2)
142+
(30:10)(31:11)(32:-1)
149143
*/

0 commit comments

Comments
 (0)