Skip to content

Commit 745f7dc

Browse files
committed
WSO2 CEP Python Thrift Client
1 parent 2de5e68 commit 745f7dc

39 files changed

Lines changed: 4189 additions & 0 deletions
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
#!/usr/bin/env python
2+
3+
import sys
4+
import time
5+
sys.path.append('gen-py')
6+
7+
from ThriftSecureEventTransmissionService import ThriftSecureEventTransmissionService
8+
from ThriftEventTransmissionService import ThriftEventTransmissionService
9+
from ThriftEventTransmissionService.ttypes import *
10+
11+
from thrift import Thrift
12+
from thrift.transport import TSSLSocket
13+
from thrift.transport import TSocket
14+
from thrift.transport import TTransport
15+
from thrift.protocol import TBinaryProtocol
16+
17+
18+
class Publisher:
19+
20+
"""
21+
Create SSL and TCP sockets along with buffer and binary protocol.
22+
"""
23+
def __init__(self, ip, ssl_port, tcp_port):
24+
# Make SSL socket
25+
self.ssl_socket = TSSLSocket.TSSLSocket(ip, ssl_port, False)
26+
self.ssl_transport = TTransport.TBufferedTransport(self.ssl_socket)
27+
self.ssl_protocol = TBinaryProtocol.TBinaryProtocol(self.ssl_transport)
28+
29+
# Make TCP socket
30+
self.tcp_socket = TSocket.TSocket(ip, tcp_port)
31+
self.tcp_transport = TTransport.TBufferedTransport(self.tcp_socket)
32+
self.tcp_protocol = TBinaryProtocol.TBinaryProtocol(self.tcp_transport)
33+
34+
def connect(self, username, password):
35+
# Create a client to use the protocol encoder
36+
self.ssl_client = ThriftSecureEventTransmissionService.Client(self.ssl_protocol)
37+
self.tcp_client = ThriftEventTransmissionService.Client(self.tcp_protocol)
38+
39+
# Make connection
40+
self.ssl_socket.open()
41+
# self.transport.open()
42+
self.sessionId = self.ssl_client.connect(username, password)
43+
44+
self.tcp_socket.open()
45+
46+
def defineStream(self, streamDef):
47+
# Create Stream Definition
48+
return self.tcp_client.defineStream(self.sessionId, streamDef)
49+
50+
def publish(self, streamId, *attributes):
51+
# Build thrift event bundle
52+
event = EventBundle()
53+
event.setSessionId(self.sessionId)
54+
event.setEventNum(1)
55+
event.addLongAttribute(time.time() * 1000)
56+
event.addStringAttribute(streamId)
57+
for attr in attributes:
58+
if isinstance(attr, int):
59+
event.addIntAttribute(attr)
60+
elif isinstance(attr, basestring):
61+
event.addStringAttribute(attr)
62+
elif isinstance(attr, long):
63+
event.addLongAttribute(attr)
64+
elif isinstance(attr, float):
65+
event.addDoubleAttribute(attr)
66+
elif isinstance(attr, bool):
67+
event.addBoolAttribute(attr)
68+
else:
69+
event.setArbitraryDataMapMap(attr)
70+
71+
# Publish
72+
self.tcp_client.publish(event.getEventBundle())
73+
74+
def disconnect(self):
75+
# Disconnect
76+
self.ssl_client.disconnect(self.sessionId)
77+
self.ssl_transport.close()
78+
self.ssl_socket.close()
79+
80+
self.tcp_transport.close()
81+
self.tcp_socket.close()
82+
83+
84+
class EventBundle:
85+
__sessionId = ""
86+
__eventNum = 0
87+
__intAttributeList = []
88+
__longAttributeList = []
89+
__doubleAttributeList = []
90+
__boolAttributeList = []
91+
__stringAttributeList = []
92+
__arbitraryDataMapMap = None
93+
94+
def setSessionId(self, sessionId):
95+
self.__sessionId = sessionId
96+
97+
def setEventNum(self, num):
98+
self.__eventNum = num
99+
100+
def addIntAttribute(self, attr):
101+
self.__intAttributeList.append(attr)
102+
103+
def addLongAttribute(self, attr):
104+
self.__longAttributeList.append(attr)
105+
106+
def addDoubleAttribute(self, attr):
107+
self.__doubleAttributeList.append(attr)
108+
109+
def addBoolAttribute(self, attr):
110+
self.__boolAttributeList.append(attr)
111+
112+
def addStringAttribute(self, attr):
113+
self.__stringAttributeList.append(attr)
114+
115+
def setArbitraryDataMapMap(self, attr):
116+
self.__arbitraryDataMapMap = attr
117+
118+
def getEventBundle(self):
119+
return Data.ttypes.ThriftEventBundle(self.__sessionId, self.__eventNum, self.__intAttributeList, self.__longAttributeList, self.__doubleAttributeList, self.__boolAttributeList, self.__stringAttributeList, self.__arbitraryDataMapMap)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#!/usr/bin/env python
2+
3+
from Publisher import Publisher
4+
5+
CEP_SERVER_ADDRESS = '192.168.122.1' # IP address of the server. You can find it at the end of the CEP console
6+
SSL_PORT = 7711 # Thrift SSL port of the server
7+
TCP_PORT = 7611 # Thrift TCP port of the server
8+
USERNAME = 'admin' # Username
9+
PASSWORD = 'admin' # Passowrd
10+
11+
publisher = Publisher(CEP_SERVER_ADDRESS, SSL_PORT, TCP_PORT)
12+
13+
# Connect to server with username and password
14+
publisher.connect(USERNAME, PASSWORD)
15+
16+
# Publish an event to the Temperature stream
17+
publisher.publish('com.javahelps.stream.Temperature:1.0.0', 'Kitchen', 56)
18+
19+
# Disconnect
20+
publisher.disconnect()
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#!/usr/bin/env python
2+
3+
import sys
4+
import os
5+
sys.path.append('gen-py')
6+
from thrift.transport import TTransport
7+
from thrift.protocol import TBinaryProtocol
8+
from thrift.server import TServer
9+
from thrift.transport import TSSLSocket
10+
from ServerHandler import ServerHandler
11+
from ThriftSecureEventTransmissionService import ThriftSecureEventTransmissionService
12+
13+
14+
SERVER_ADDRESS = 'localhost'
15+
SERVER_PORT = 8988
16+
CERTIFICATE_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'server.pem')
17+
18+
handler = ServerHandler()
19+
processor = ThriftSecureEventTransmissionService.Processor(handler)
20+
transport = TSSLSocket.TSSLServerSocket(SERVER_ADDRESS, SERVER_PORT, certfile=CERTIFICATE_FILE)
21+
tfactory = TTransport.TBufferedTransportFactory()
22+
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
23+
24+
server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
25+
26+
print 'Starting SSL Server at ' + SERVER_ADDRESS + ':' + str(SERVER_PORT)
27+
28+
server.serve()
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#!/usr/bin/env python
2+
3+
4+
class ServerHandler(object):
5+
"""
6+
ServerHandler contains the functions to serve the requests from WSO2 CEP.
7+
"""
8+
def __init__(self):
9+
pass
10+
11+
"""
12+
Receive the username and password, verify it and return a unique session id.
13+
"""
14+
def connect(self, uname, password):
15+
print 'Connect ' + uname + ':' + password
16+
return '123456' # A random session id
17+
18+
"""
19+
Destroy the session.
20+
"""
21+
def disconnect(self, sessionId):
22+
print 'Disconnect the session ' + str(sessionId)
23+
24+
def defineStream(self, sessionId, streamDefinition):
25+
return ''
26+
27+
def findStreamId(self, sessionId, streamName, streamVersion):
28+
return ''
29+
30+
"""
31+
Receive the event and process it.
32+
"""
33+
def publish(self, eventBundle):
34+
print 'Received a new event: ' + str(eventBundle) + '\n'
35+
36+
def deleteStreamById(self, sessionId, streamId):
37+
pass
38+
39+
def deleteStreamByNameVersion(self, sessionId, streamName, streamVersion):
40+
pass
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#!/usr/bin/env python
2+
3+
import sys
4+
sys.path.append('gen-py')
5+
from thrift.transport import TSocket
6+
from thrift.transport import TTransport
7+
from thrift.protocol import TBinaryProtocol
8+
from thrift.server import TServer
9+
from ServerHandler import ServerHandler
10+
from ThriftEventTransmissionService import ThriftEventTransmissionService
11+
12+
SERVER_ADDRESS = 'localhost'
13+
SERVER_PORT = 8888
14+
15+
handler = ServerHandler()
16+
processor = ThriftEventTransmissionService.Processor(handler)
17+
transport = TSocket.TServerSocket(SERVER_ADDRESS, SERVER_PORT)
18+
tfactory = TTransport.TBufferedTransportFactory()
19+
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
20+
21+
server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
22+
23+
print 'Starting TCP Server at ' + SERVER_ADDRESS + ':' + str(SERVER_PORT)
24+
25+
server.serve()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
__all__ = ['ttypes', 'constants']
159 Bytes
Binary file not shown.

wso2/cep/client/thrift/python-client/gen-py/Data/constants.py

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)