Skip to content

Commit 4fd5194

Browse files
committed
manager: implement pfx table itaration for primary pfx table
This patch allows iteration in two ways, callback driven and iterator driven. The later approach uses threads to run the callback in the background.
1 parent aa46897 commit 4fd5194

File tree

5 files changed

+179
-3
lines changed

5 files changed

+179
-3
lines changed

ffi_build.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
extern "Python" void rtr_mgr_status_callback(const struct rtr_mgr_group *, enum rtr_mgr_status, const struct rtr_socket *, void *);
1616
extern "Python" void pfx_update_callback(struct pfx_table *pfx_table, const struct pfx_record record, const bool added);
1717
extern "Python" void spki_update_callback(struct spki_table *spki_table, const struct spki_record record, const bool added);
18+
extern "Python" void pfx_table_callback(const struct pfx_record *pfx_record, void *data);
1819
""")
1920

2021
ffibuilder.set_source("_rtrlib",

rtrlib/callbacks.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,17 @@ def status_callback(rtr_mgr_group, group_status, rtr_socket, object_handle):
7474
)
7575

7676

77+
@ffi.def_extern(name="pfx_table_callback")
78+
def pfx_table_callback(pfx_record, object_handle):
79+
"""
80+
Wraps the pfx_table callback, used for iteration of the pfx table,
81+
to hide cffi specifics
82+
"""
83+
callback, data = ffi.from_handle(object_handle)
84+
85+
callback(PFXRecord(pfx_record), data)
86+
87+
7788
def pfx_update_callback_wrapper(func):
7889
"""
7990
Wraps the given python function and wraps it to hide cffi specifics.

rtrlib/records.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,26 @@ def socket(self):
5656
"""
5757
return RTRSocket(self._record.socket)
5858

59+
def __str__(self):
60+
return "{prefix}/{min}-{max} {asn}".format(prefix=self.prefix,
61+
min=self.min_len,
62+
max=self.max_len,
63+
asn=self.asn
64+
)
65+
66+
67+
def copy_pfx_record(record):
68+
"""Copy a pfx record."""
69+
cdata = record._record
70+
new_record = ffi.new('struct pfx_record *')
71+
new_record.asn = cdata.asn
72+
new_record.prefix = cdata.prefix
73+
new_record.min_len = cdata.min_len
74+
new_record.max_len = cdata.max_len
75+
new_record.socket = cdata.socket
76+
77+
return PFXRecord(new_record)
78+
5979

6080
class SPKIRecord(object):
6181
"""

rtrlib/rtr_manager.py

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,20 @@
1111
import time
1212
import logging
1313
import signal
14+
import threading
1415

1516
from enum import Enum
1617
from _rtrlib import ffi, lib
1718

18-
import six
1919
import rtrlib.callbacks as callbacks
20-
21-
from .util import to_bytestr, is_integer, is_string, ip_str_to_addr
20+
import rtrlib.records as records
21+
22+
from .util import (to_bytestr,
23+
is_integer,
24+
is_string,
25+
ip_str_to_addr,
26+
CallbackGenerator
27+
)
2228
from .exceptions import RTRInitError, PFXException, SyncTimeout
2329

2430

@@ -208,6 +214,76 @@ def validate(self, asn, prefix, mask_len):
208214

209215
return PfxvState(result[0])
210216

217+
def for_each_ipv4_record(self, callback, data):
218+
r"""
219+
Iterate over all ipv4 records of the pfx table.
220+
221+
callback must take two arguments, the pfx_record and the data object.
222+
223+
For a more pythonic alternative see :py:meth:`ipv4_records`
224+
225+
:param callable callback: called for every record in the pfx table
226+
:param object data: arbitrary data object \
227+
that is passed to the callback function
228+
"""
229+
data_handle = ffi.new_handle((callback, data))
230+
231+
lib.rtr_mgr_for_each_ipv4_record(
232+
self.rtr_manager_config,
233+
lib.pfx_table_callback,
234+
data_handle
235+
)
236+
237+
def ipv4_records(self):
238+
r"""
239+
Return iterator over all ipv4 records in the pfx table.
240+
241+
This iterator utilises threads to execute retrieve the records. \
242+
If that is a problem for you take a look at \
243+
:py:meth:`for_each_ipv4_record`.
244+
"""
245+
def callback(record, data):
246+
LOG.debug('Putting "%s" in queue', record)
247+
data.put_nowait(records.copy_pfx_record(record))
248+
249+
generator = CallbackGenerator(self.for_each_ipv4_record, callback)
250+
return generator
251+
252+
def for_each_ipv6_record(self, callback, data):
253+
r"""
254+
Iterate over all ipv6 records of the pfx table.
255+
256+
callback must take two arguments, the pfx_record and the data object.
257+
258+
For a more pythonic alternative see :py:meth:`ipv6_records`
259+
260+
:param callable callback: called for every record in the pfx table
261+
:param object data: arbitrary data object \
262+
that is passed to the callback function
263+
"""
264+
data_handle = ffi.new_handle((callback, data))
265+
266+
lib.rtr_mgr_for_each_ipv6_record(
267+
self.rtr_manager_config,
268+
lib.pfx_table_callback,
269+
data_handle
270+
)
271+
272+
def ipv6_records(self):
273+
r"""
274+
Return iterator over all ipv6 records in the pfx table.
275+
276+
This iterator utilises threads to execute retrieve the records. \
277+
If that is a problem for you take a look at \
278+
:py:meth:`for_each_ipv6_record`.
279+
"""
280+
def callback(record, data):
281+
LOG.debug('Putting "%s" in queue', record)
282+
data.put_nowait(records.copy_pfx_record(record))
283+
284+
generator = CallbackGenerator(self.for_each_ipv6_record, callback)
285+
return generator
286+
211287

212288
class PfxvState(Enum):
213289
"""

rtrlib/util.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111

1212
import logging
1313
import six
14+
import threading
15+
16+
from six.moves import queue
1417

1518
from _rtrlib import ffi, lib
1619
from .exceptions import IpConversionException
@@ -76,3 +79,68 @@ def is_string(var):
7679
Checks if var is a string
7780
"""
7881
return isinstance(var, six.string_types)
82+
83+
84+
class StoppableThread(threading.Thread):
85+
"""
86+
Thread class with a stop() method.
87+
88+
The thread itself has to check regularly for the stopped() condition.
89+
"""
90+
91+
def __init__(self, *args, **kwargs):
92+
super(StoppableThread, self).__init__(*args, **kwargs)
93+
self._stop_event = threading.Event()
94+
95+
def stop(self):
96+
self._stop_event.set()
97+
98+
def stopped(self):
99+
return self._stop_event.isSet()
100+
101+
102+
class CallbackGenerator(object):
103+
104+
def __init__(self, function, callback, args=()):
105+
def inner_callback(pfx_record, data):
106+
while not data.thread.stopped():
107+
try:
108+
data.callback(pfx_record, data.queue)
109+
break
110+
except queue.Full:
111+
pass
112+
return
113+
114+
self.callback = callback
115+
self.queue = queue.Queue()
116+
new_args = list(args)
117+
new_args.extend((inner_callback, self))
118+
self.thread = StoppableThread(
119+
target=function,
120+
args=new_args,
121+
daemon=True,
122+
)
123+
124+
self.thread.start()
125+
126+
def __del__(self,):
127+
self.thread.stop()
128+
129+
def __iter__(self,):
130+
return self
131+
132+
def __next__(self,):
133+
while(True):
134+
try:
135+
LOG.debug('About to take item out of queue')
136+
item = self.queue.get_nowait()
137+
LOG.debug('Took "%s" out of the queue', item)
138+
self.queue.task_done()
139+
return item
140+
except queue.Empty:
141+
if self.thread.is_alive() or self.queue.qsize() > 0:
142+
LOG.debug('Queue not yet filled, looping')
143+
continue
144+
else:
145+
LOG.debug('Queue empty stopping iteration')
146+
raise StopIteration()

0 commit comments

Comments
 (0)