forked from thevibepei/pythonMultiProcessAndThreadExamples
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFILE_zhelpers.py
More file actions
57 lines (48 loc) · 1.54 KB
/
FILE_zhelpers.py
File metadata and controls
57 lines (48 loc) · 1.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# encoding: utf-8
"""
Helper module for example applications. Mimics ZeroMQ Guide's zhelpers.h.
"""
from __future__ import print_function
import binascii
import os
from random import randint
import zmq
def socket_set_hwm(socket, hwm=-1):
"""libzmq 2/3 compatible sethwm"""
try:
socket.sndhwm = socket.rcvhwm = hwm
except AttributeError:
socket.hwm = hwm
#
def dump(msg_or_socket):
"""Receives all message parts from socket, printing each frame neatly"""
if isinstance(msg_or_socket, zmq.Socket):
# it's a socket, call on current message
msg = msg_or_socket.recv_multipart()
else:
msg = msg_or_socket
print("----------------------------------------")
for part in msg:
print("[%03d]" % len(part), end=' ')
is_text = True
try:
print(part.decode('ascii'))
except UnicodeDecodeError:
print(r"0x%s" % (binascii.hexlify(part).decode('ascii')))
def set_id(zsocket):
"""Set simple random printable identity on socket"""
identity = u"%04x-%04x" % (randint(0, 0x10000), randint(0, 0x10000))
zsocket.setsockopt_string(zmq.IDENTITY, identity)
def zpipe(ctx):
"""build inproc pipe for talking to threads
mimic pipe used in czmq zthread_fork.
Returns a pair of PAIRs connected via inproc
"""
a = ctx.socket(zmq.PAIR)
b = ctx.socket(zmq.PAIR)
a.linger = b.linger = 0
a.hwm = b.hwm = 1
iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
a.bind(iface)
b.connect(iface)
return a,b