-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpriority_queue.py
More file actions
77 lines (63 loc) · 1.96 KB
/
priority_queue.py
File metadata and controls
77 lines (63 loc) · 1.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import unittest
class PriorityQueue(object):
def __init__(self, max_size=100):
self.max_size = max_size
self.heap_size = 0
self.heap = []
for i in range(self.max_size):
self.heap.append(0)
def is_empty(self):
return self.heap_size == 0
def push(self, value):
# is full
if self.heap_size + 1 > self.max_size:
return
self.heap[self.heap_size] = value
pos = self.heap_size
while pos > 0 and self.heap[pos] < self.heap[(pos - 1) // 2]:
parent = (pos - 1) // 2
self.heap[parent], self.heap[pos] = self.heap[pos], self.heap[parent]
pos = parent
self.heap_size += 1
def pop(self):
if self.heap_size <= 0:
return None
value = self.heap[0]
self.heap_size -= 1
self.heap[0] = self.heap[self.heap_size]
pos = 0
while pos < self.heap_size and pos * 2 + 1 < self.heap_size:
if pos * 2 + 2 >= self.heap_size:
child = pos * 2 + 1
else:
if self.heap[pos * 2 + 1] < self.heap[pos * 2 + 2]:
child = pos * 2 + 1
else:
child = pos * 2 + 2
if self.heap[pos] < self.heap[child]:
break
self.heap[pos], self.heap[child] = self.heap[child], self.heap[pos]
pos = child
return value
class TestPriorityQueue(unittest.TestCase):
def test(self):
pq = PriorityQueue()
pq.push(10)
pq.push(49)
pq.push(38)
pq.push(17)
pq.push(56)
pq.push(92)
pq.push(8)
pq.push(1)
pq.push(13)
pq.push(55)
result = []
while not pq.is_empty():
result.append(pq.pop())
self.assertEqual(
result,
[1, 8, 10, 13, 17, 38, 49, 55, 56, 92]
)
if __name__ == '__main__':
unittest.TestCase()