-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_pfx_table.py
More file actions
140 lines (115 loc) · 4.85 KB
/
test_pfx_table.py
File metadata and controls
140 lines (115 loc) · 4.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# -*- coding: utf8 -*-
"""
tests.pfx_table_test
------------------
"""
import os, sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
import unittest
from rtrlib import PfxTable
# flag constants for asserting the validation result
VALID = 1 << 0
INVALID = 1 << 1
UNKNOWN = 1 << 2
LENGTH_INVALID = 1 << 3
AS_INVALID = 1 << 4
class PfxTableTest(unittest.TestCase):
DEFAULT_RECORDS = [
(10010, '110.1.0.0', 20, 24),
(10020, '120.1.0.0', 20, 32),
(10030, '130::', 64, 64)
]
def setUp(self):
"""
Create a new prefix table instance for any test method
"""
self.pfx_table = PfxTable()
def tearDown(self):
"""
Close and remove the current prefix table instance
"""
self.pfx_table.close()
def test_v4(self):
"""
- Test IPV4 record in prefix table
"""
self._fill_table(self.DEFAULT_RECORDS)
self._assert(10010, '110.1.0.0', 20, VALID)
self._assert(10010, '110.1.0.0', 24, VALID)
self._assert(10010, '110.1.8.0', 23, VALID)
self._assert(10010, '110.1.0.0', 18, UNKNOWN)
self._assert(10010, '110.1.0.0', 30, INVALID)
self._assert(10011, '110.1.0.0', 20, INVALID)
self._assert_r(10010, '110.1.0.0', 20, VALID)
self._assert_r(10010, '110.1.0.0', 24, VALID)
self._assert_r(10010, '110.1.8.0', 23, VALID)
self._assert_r(10010, '110.1.0.0', 18, UNKNOWN)
self._assert_r(10010, '110.1.0.0', 30, INVALID | LENGTH_INVALID)
self._assert_r(10011, '110.1.0.0', 20, INVALID | AS_INVALID)
def test_v6(self):
"""
- Test IPV6 record in prefix table
"""
self._fill_table(self.DEFAULT_RECORDS)
self._assert(10030, '130::', 64, VALID)
self._assert(10030, '130::', 66, INVALID)
self._assert(10030, '130::', 62, UNKNOWN)
self._assert_r(10030, '130::', 64, VALID)
self._assert_r(10030, '130::', 66, INVALID | LENGTH_INVALID)
self._assert_r(10030, '130::', 62, UNKNOWN)
def test_duplicate(self):
"""
- Add duplicate record into prefix table (same prefix range, other AS number)
"""
self._fill_table(self.DEFAULT_RECORDS)
self._fill_table([(10020, '110.1.0.0', 22, 24)])
self._assert(10010, '110.1.0.0', 20, VALID)
self._assert(10010, '110.1.0.0', 24, VALID)
self._assert(10020, '110.1.0.0', 22, VALID)
self._assert(10020, '110.1.0.0', 24, VALID)
def test_remove(self):
"""
- Remove record from prefix table
"""
self._fill_table(self.DEFAULT_RECORDS)
self.pfx_table.remove_record(10010, '110.1.0.0', 20, 24)
self._assert(10010, '110.1.0.0', 20, UNKNOWN)
def _fill_table(self, records):
"""
Adds a list of record tuples to the prefix talbe.
Each tuple must contain these four elements:
* as number (int)
* ip address (str)
* min range length (int)
* max range length (int)
:param records: List of records to be added to the prefix table
:type records: list
"""
for record in records:
self.pfx_table.add_record(*record)
def _assert(self, asn, ip, mask, exp_flags):
"""
Let the prefix table validate the given IP prefix
and check the returned validation result
"""
r = self.pfx_table.validate(asn, ip, mask)
record = "AS{0}:{1}/{2}".format(asn, ip, mask)
self._assert_flag(r.is_valid, bool(exp_flags & VALID), record, "is invalid", "is valid")
self._assert_flag(r.is_invalid, bool(exp_flags & INVALID), record, "is valid", "is invalid")
self._assert_flag(r.not_found, bool(exp_flags & UNKNOWN), record, "was found", "not found")
def _assert_r(self, asn, ip, mask, exp_flags):
"""
Let the prefix table validate the given IP prefix
and check the returned validation result
"""
r = self.pfx_table.validate_r(asn, ip, mask)
record = "AS{0}:{1}/{2}".format(asn, ip, mask)
self._assert_flag(r.is_valid, bool(exp_flags & VALID), record, "is invalid", "is valid")
self._assert_flag(r.is_invalid, bool(exp_flags & INVALID), record, "is valid", "is invalid")
self._assert_flag(r.not_found, bool(exp_flags & UNKNOWN), record, "was found", "not found")
self._assert_flag(r.length_invalid, bool(exp_flags & LENGTH_INVALID), record, "has invalid length", "has valid length")
self._assert_flag(r.as_invalid, bool(exp_flags & AS_INVALID), record, "has invalid AS", "has valid AS")
def _assert_flag(self, act_flag, exp_val, record, msg_exp_true, msg_exp_false):
self.assertEqual(act_flag, exp_val, "{0} {1}".format(record, msg_exp_true if exp_val else msg_exp_false))
if __name__ == '__main__':
unittest.main()