-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcli-validator.py
More file actions
68 lines (50 loc) · 1.63 KB
/
cli-validator.py
File metadata and controls
68 lines (50 loc) · 1.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python
# -*- coding: utf8 -*-
from __future__ import unicode_literals, print_function
import fileinput
import re
import sys
from time import sleep
from rtrlib import RTRManager, ManagerGroupStatus
INPUT_REGEX = re.compile(
r'^(?P<ip>[0-9a-fA-F.:]+) (?P<prefix>\d{1,3}) (?P<ASN>\d+)$'
)
OUTPUT_FORMAT = "{}/{} {}: {}"
class GroupStatus(object):
def __init__(self):
self.error = False
def connection_status_collback(rtr_mgr_group, group_status, rtr_socket, data):
if group_status == ManagerGroupStatus.ERROR:
data.error = True
def main():
if len(sys.argv) < 3:
print("Usage: {} [host] [port]".format(sys.argv[0]))
exit()
status = GroupStatus()
mgr = RTRManager(
sys.argv[1],
sys.argv[2],
status_callback=connection_status_collback,
status_callback_data=status
)
mgr.start()
while not mgr.is_synced():
sleep(0.2)
if status.error:
print("Connection error")
exit()
for line in fileinput.input('-'):
line = line.strip()
match = INPUT_REGEX.match(line)
if not match:
print("Invalid line '%s'" % line)
print("Arguments required: IP Mask ASN")
else:
asn = int(match.group('ASN'))
ip = match.group('ip')
prefix_length = int(match.group('prefix'))
result = mgr.validate(asn, ip, prefix_length)
print(OUTPUT_FORMAT.format(ip, prefix_length, asn, result))
mgr.stop()
if __name__ == '__main__':
main()