forked from zhanghe06/python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathredis_pub_sub.py
More file actions
144 lines (116 loc) · 2.93 KB
/
redis_pub_sub.py
File metadata and controls
144 lines (116 loc) · 2.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python
# encoding: utf-8
"""
@author: zhanghe
@software: PyCharm
@file: redis_pub_sub.py
@time: 2017/2/14 下午11:03
"""
import sys
import redis
import json
class RedisPubSub(object):
"""
Pub/Sub
"""
def __init__(self, name, namespace='queue', **redis_kwargs):
"""The default connection parameters are: host='localhost', port=6379, db=0"""
self.__db = redis.Redis(**redis_kwargs)
self.key = '%s:%s' % (namespace, name)
def pub(self, k, v):
"""
Pub
:param k:
:param v:
:return:
"""
ch = '%s:%s' % (self.key, k)
self.__db.publish(ch, v)
def sub(self, k):
"""
Sub
:param k:
:return:
"""
ps = self.__db.pubsub()
ch = '%s:%s' % (self.key, k)
ps.subscribe(ch)
for item in ps.listen():
# {'pattern': None, 'type': 'subscribe', 'channel': 'queue:test:hh', 'data': 1L}
# yield item
yield item.get('data')
def p_sub(self, k):
"""
PSub
订阅一个或多个符合给定模式的频道
每个模式以 * 作为匹配符
注意 psubscribe 与 subscribe 区别
:param k:
:return:
"""
ps = self.__db.pubsub()
ch = '%s:%s' % (self.key, k)
ps.psubscribe(ch)
for item in ps.listen():
# {'pattern': None, 'type': 'subscribe', 'channel': 'queue:test:hh', 'data': 1L}
# yield item
yield item.get('data')
def sub_not_loop(self, k):
"""
Sub 非无限循环,取到结果即退出
:param k:
:return:
"""
ps = self.__db.pubsub()
ch = '%s:%s' % (self.key, k)
ps.subscribe(ch)
for item in ps.listen():
if item['type'] == 'message':
return item.get('data')
def test_pub():
q = RedisPubSub('test:aa')
q.pub('hh', '123')
def test_sub():
q = RedisPubSub('test:aa')
r = q.sub('hh')
for i in r:
print i
def test_p_sub():
q = RedisPubSub('test:*')
r = q.p_sub('hh')
for i in r:
print i
def test_sub_not_loop():
q = RedisPubSub('test')
r = q.sub_not_loop('hh')
print r
def run():
# print sys.argv
try:
if len(sys.argv) > 1:
fun_name = eval(sys.argv[1])
fun_name()
else:
print '缺失参数'
except NameError, e:
print e
print '未定义的方法[%s]' % sys.argv[1]
if __name__ == '__main__':
run()
"""
测试一般模式
终端一:
✗ python redis_pub_sub.py test_sub
终端二:
✗ python redis_pub_sub.py test_pub
测试模式订阅
终端一:
✗ python redis_pub_sub.py test_p_sub
终端二:
✗ python redis_pub_sub.py test_pub
测试非无限循环模式
终端一:
✗ python redis_pub_sub.py test_sub_not_loop
终端二:
✗ python redis_pub_sub.py test_pub
"""