Skip to content

Commit 3eecb88

Browse files
committed
enhancement: Implement Static Hasing Client & Pool, supporting Bulk Requests
1 parent bf84c86 commit 3eecb88

File tree

15 files changed

+659
-17
lines changed

15 files changed

+659
-17
lines changed

docs/source/example.rst

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,20 @@ commands.
4141
4242
client = Client(host="localhost")
4343
client.bulk_start()
44-
client.set('key1', 'value1)
45-
client.set('key1', 'value1)
46-
client.set('key1', 'value1)
44+
client.set('key1', 'value1')
45+
client.set('key2', 'value2')
46+
client.set('key3', 'value3')
47+
client.bulk_stop()
48+
[b'OK', b'OK', b'OK']
49+
50+
51+
from pyredis import HashClient
52+
53+
client = Client(buckets=[('host1', 6379), ('host2', 6379), ('host3', 6379)])
54+
client.bulk_start()
55+
client.set('key1', 'value1')
56+
client.set('key2', 'value2')
57+
client.set('key3', 'value3')
4758
client.bulk_stop()
4859
[b'OK', b'OK', b'OK']
4960
@@ -74,6 +85,19 @@ Using a Cluster Connection Pool
7485
pool.release(client)
7586
7687
88+
Using a Hash Connection Pool
89+
----------------------------
90+
.. code:: python
91+
92+
from pyredis import HashPool
93+
94+
pool = HashPool(buckets=[('host1', 6379), ('host2', 6379), ('host3', 6379)])
95+
client = pool.aquire()
96+
client.ping(shard_key='test')
97+
b'PONG'
98+
pool.release(client)
99+
100+
77101
Using a Sentinel backed Connection Pool
78102
---------------------------------------
79103
.. code:: python

docs/source/intro.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ Currently implemented Features:
1111
- Sentinel Backed Connection Pool
1212
- Client & Pool for Redis Cluster
1313
- Bulk Mode ( Not supported with Redis Cluster )
14+
- Client & Pool with Static Hash Cluster (Supports Bulk Mode)
1415

1516
Planned Features:
17+
- Sentinel Backed Client & Pool with Static Hash Cluster (will support Bulk Mode)
1618
- Python 2 support ( If there is enough interest )
1719

1820

docs/source/pyredis.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@ ClusterClient
2222
.. autoclass:: pyredis.ClusterClient
2323
:members:
2424

25+
HashClient
26+
----------
27+
28+
.. autoclass:: pyredis.HashClient
29+
:members:
30+
2531
PubSubClient
2632
------------
2733

@@ -49,6 +55,12 @@ ClusterPool
4955
.. autoclass:: pyredis.ClusterPool
5056
:members:
5157

58+
HashPool
59+
--------
60+
61+
.. autoclass:: pyredis.HashPool
62+
:members:
63+
5264
Pool
5365
----
5466

pyredis/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
"""
1010

1111
from pyredis.exceptions import *
12-
from pyredis.client import Client, ClusterClient, PubSubClient, SentinelClient
13-
from pyredis.pool import ClusterPool, Pool, SentinelPool
12+
from pyredis.client import Client, ClusterClient, HashClient, PubSubClient, SentinelClient
13+
from pyredis.pool import ClusterPool, HashPool, Pool, SentinelPool
1414

1515
__all__ = [
1616
'get_by_url',

pyredis/client.py

Lines changed: 208 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from pyredis import commands
44
from pyredis.connection import Connection
55
from pyredis.exceptions import PyRedisError, PyRedisConnError, PyRedisConnReadTimeout, ReplyError
6-
from pyredis.helper import dict_from_list, ClusterMap
6+
from pyredis.helper import dict_from_list, ClusterMap, slot_from_key
77

88

99
class Client(
@@ -217,7 +217,7 @@ def __init__(
217217
if not bool(seeds) != bool(cluster_map):
218218
raise PyRedisError('Ether seeds or cluster_map has to be provided')
219219
self._cluster = True
220-
self._conns = {}
220+
self._conns = dict()
221221
self._conn_timeout = conn_timeout
222222
self._read_timeout = read_timeout
223223
self._encoding = encoding
@@ -321,6 +321,212 @@ def execute(self, *args, shard_key=None, sock=None, asking=False, retries=3):
321321
raise err
322322

323323

324+
class HashClient(
325+
commands.Connection,
326+
commands.Hash,
327+
commands.HyperLogLog,
328+
commands.Key,
329+
commands.List,
330+
commands.Publish,
331+
commands.Scripting,
332+
commands.Set,
333+
commands.SSet,
334+
commands.String,
335+
commands.Transaction,
336+
):
337+
""" Client for Talking to Static Hashed Redis Cluster.
338+
339+
The Client will calculate a crc16 hash using the shard_key,
340+
which is be default the first Key in case the command supports multiple keys.
341+
If the Key is using the TAG annotation "bla{tag}blarg",
342+
then only the tag portion is used, in this case "tag".
343+
The key space is split into 16384 buckets, so in theory you could provide
344+
a list with 16384 ('host', port) pairs to the "buckets" parameter.
345+
If you have less then 16384 ('host', port) pairs, the client will try to
346+
distribute the key spaces evenly between available pairs.
347+
348+
--- Warning ---
349+
Since this is static hashing, the the order of pairs has to match on each client you use!
350+
Also changing the number of pairs will change the mapping between buckets and pairs,
351+
rendering your data inaccessible!
352+
353+
Inherits the following Commmand classes:
354+
- commands.Connection,
355+
- commands.Hash,
356+
- commands.HyperLogLog,
357+
- commands.Key,
358+
- commands.List,
359+
- commands.Publish,
360+
- commands.Scripting,
361+
- commands.Set,
362+
- commands.SSet,
363+
- commands.String,
364+
- commands.Transaction
365+
"""
366+
def __init__(self, buckets, database=0, password=None, encoding=None, conn_timeout=2, read_timeout=2):
367+
368+
super().__init__()
369+
self._conns = dict()
370+
self._conn_names = list()
371+
self._bulk = False
372+
self._bulk_keep = False
373+
self._bulk_results = None
374+
self._bulk_size = None
375+
self._bulk_size_current = None
376+
self._bulk_bucket_order = list()
377+
self._closed = False
378+
self._cluster = True
379+
self._map = dict()
380+
self._init_conns(buckets, database, password, encoding, conn_timeout, read_timeout)
381+
self._init_map()
382+
383+
def _bulk_fetch(self):
384+
for conn in self._bulk_bucket_order:
385+
result = conn.read(raise_on_result_err=False)
386+
if self._bulk_keep:
387+
self._bulk_results.append(result)
388+
self._bulk_bucket_order = list()
389+
self._bulk_size_current = 0
390+
391+
@staticmethod
392+
def _execute_basic(*args, conn):
393+
conn.write(*args)
394+
return conn.read()
395+
396+
def _execute_bulk(self, *args, conn):
397+
conn.write(*args)
398+
self._bulk_size_current += 1
399+
self._bulk_bucket_order.append(conn)
400+
if self._bulk_size_current == self._bulk_size:
401+
self._bulk_fetch()
402+
403+
def _init_conns(self, buckets, database, password, encoding, conn_timeout, read_timeout):
404+
for bucket in buckets:
405+
host, port = bucket
406+
bucketname = '{0}_{1}'.format(host, port)
407+
self._conn_names.append(bucketname)
408+
self._conns[bucketname] = Connection(
409+
host=host, port=port, database=database, password=password,
410+
encoding=encoding, conn_timeout=conn_timeout, read_timeout=read_timeout
411+
)
412+
413+
def _init_map(self):
414+
num_buckets = len(self._conn_names) - 1
415+
cur_bucket = 0
416+
for slot in range(16384):
417+
self._map[slot] = self._conn_names[cur_bucket]
418+
if cur_bucket == num_buckets:
419+
cur_bucket = 0
420+
else:
421+
cur_bucket += 1
422+
423+
@property
424+
def bulk(self):
425+
""" True if bulk mode is enabled.
426+
427+
:return: bool
428+
"""
429+
return self._bulk
430+
431+
def bulk_start(self, bulk_size=5000, keep_results=True):
432+
""" Enable bulk mode
433+
434+
Put the client into bulk mode. Instead of executing a command & waiting for
435+
the reply, all commands are send to Redis without fetching the result.
436+
The results get fetched whenever $bulk_size commands have been executed,
437+
which will also resets the counter, or of bulk_stop() is called.
438+
439+
:param bulk_size:
440+
Number of commands to execute, before fetching results.
441+
:type bulk_size: int
442+
443+
:param keep_results:
444+
If True, keep the results. The Results will be returned when calling bulk_stop.
445+
:type keep_results: bool
446+
447+
:return: None
448+
"""
449+
if self.bulk:
450+
raise PyRedisError("Already in bulk mode")
451+
self._bulk = True
452+
self._bulk_size = bulk_size
453+
self._bulk_size_current = 0
454+
if keep_results:
455+
self._bulk_results = []
456+
self._bulk_keep = True
457+
458+
def bulk_stop(self):
459+
""" Stop bulk mode.
460+
461+
All outstanding results from previous commands get fetched.
462+
If bulk_start was called with keep_results=True, return a list with all
463+
results from the executed commands in order. The list of results can also contain
464+
Exceptions, hat you should check for.
465+
466+
:return: None, list
467+
"""
468+
if not self.bulk:
469+
raise PyRedisError("Not in bulk mode")
470+
self._bulk_fetch()
471+
results = self._bulk_results
472+
self._bulk = False
473+
self._bulk_keep = False
474+
self._bulk_results = None
475+
self._bulk_size = None
476+
self._bulk_size_current = None
477+
return results
478+
479+
def close(self):
480+
""" Close client.
481+
482+
:return: None
483+
"""
484+
for conn in self._conns.values():
485+
conn.close()
486+
self._closed = True
487+
488+
@property
489+
def closed(self):
490+
""" Check if client is closed.
491+
492+
:return: bool
493+
"""
494+
return self._closed
495+
496+
def execute(self, *args, shard_key=None, sock=None):
497+
""" Execute arbitrary redis command.
498+
499+
:param args:
500+
:type args: list, int, float
501+
502+
:param shard_key: (optional)
503+
Should be set to the key name you try to work with.
504+
Can not be used if sock is set.
505+
:type shard_key: string
506+
507+
:param sock: (optional)
508+
The string representation of a socket, the command should be executed against.
509+
For example: "testhost_6379"
510+
Can not be used if shard_key is set.
511+
:type sock: string
512+
513+
:return: result, exception
514+
"""
515+
if not bool(shard_key) != bool(sock):
516+
raise PyRedisError('Ether shard_key or sock has to be provided')
517+
if not sock:
518+
sock = self._map[slot_from_key(shard_key)]
519+
conn = self._conns[sock]
520+
try:
521+
if not self._bulk:
522+
return self._execute_basic(conn=conn, *args)
523+
else:
524+
self._execute_bulk(conn=conn, *args)
525+
except PyRedisConnError as err:
526+
self.close()
527+
raise err
528+
529+
324530
class PubSubClient(commands.Subscribe):
325531
""" Pub/Sub Client.
326532

pyredis/helper.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def tag_from_key(key):
3434

3535

3636
def slot_from_key(key):
37-
return crc16.crc16xmodem(key) % 16384
37+
return crc16.crc16xmodem(tag_from_key(key)) % 16384
3838

3939

4040
class ClusterMap(object):
@@ -72,9 +72,9 @@ def _update_slot(self, slot, master, slaves):
7272

7373
def get_slot(self, shard_key, slave=None):
7474
if not slave:
75-
return self._map[slot_from_key(tag_from_key(shard_key))]['master']
75+
return self._map[slot_from_key(shard_key)]['master']
7676
else:
77-
return self._map[slot_from_key(tag_from_key(shard_key))]['slave']
77+
return self._map[slot_from_key(shard_key)]['slave']
7878

7979
def hosts(self, slave=None):
8080
result = set()

0 commit comments

Comments
 (0)