forked from dolphindb/api-java
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathStreamingData.java
More file actions
162 lines (150 loc) · 6.22 KB
/
StreamingData.java
File metadata and controls
162 lines (150 loc) · 6.22 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import com.xxdb.data.*;
import com.xxdb.DBConnection;
import com.xxdb.data.Vector;
import com.xxdb.streaming.client.*;
import java.io.IOException;
import java.net.SocketException;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
public class StreamingData {
private static DBConnection conn;
public static String HOST = "localhost";
public static Integer PORT = 8848;
public static ThreadedClient client;
public static char METHOD = 'P';
public static Integer subscribePORT = 8892;
public void createStreamTable() throws IOException {
conn.login("admin", "123456", false);
conn.run("share streamTable(30000:0,`id`time`sym`qty`price,[INT,TIME,SYMBOL,INT,DOUBLE]) as Trades\n");
conn.run("def saveData(data){ Trades.tableInsert(data)}");
}
public void PollingClient() throws SocketException {
PollingClient client = new PollingClient(subscribePORT);
try {
TopicPoller poller1 = client.subscribe(HOST, PORT, "Trades");
int count = 0;
boolean started = false;
long start = System.currentTimeMillis();
long last = System.currentTimeMillis();
while (true) {
ArrayList<IMessage> msgs = poller1.poll(1000);
if (msgs == null) {
count = 0;
start = System.currentTimeMillis();
continue;
}
if (msgs.size() > 0 && started == false) {
started = true;
start = System.currentTimeMillis();
}
count += msgs.size();
for (int i = 0; i < msgs.size(); ++i) {
BasicTime time = (BasicTime) msgs.get(i).getEntity(1);
System.out.print("time:" + time + " ");
String symbol = msgs.get(i).getEntity(2).getString();
System.out.print("sym:" + symbol + " ");
Integer qty = ((BasicInt) msgs.get(i).getEntity(3)).getInt();
System.out.print("qty:" + qty + " ");
Double price = ((BasicDouble) msgs.get(i).getEntity(4)).getDouble();
System.out.print("price:" + price + " \n");
}
if (msgs.size() > 0) {
if (((BasicInt) msgs.get(msgs.size() - 1).getEntity(0)).getInt() == -1) {
break;
}
}
long now = System.currentTimeMillis();
if (now - last >= 1000) {
long end = System.currentTimeMillis();
System.out.println(count + " messages took " + (end - start) + "ms, throughput: " + count / ((end - start) / 1000.0) + " messages/s");
last = now;
}
}
long end = System.currentTimeMillis();
System.out.println(count + " messages took " + (end - start) + "ms, throughput: " + count / ((end - start) / 1000.0) + " messages/s");
client.unsubscribe(HOST, PORT, "Trades");
} catch (IOException e) {
e.printStackTrace();
}
System.exit(0);
}
public static class SampleMessageHandler implements MessageHandler {
private AtomicLong count = new AtomicLong();
private long start = 0;
private boolean started = false;
@Override
public void doEvent(IMessage msg) {
if (started == false) {
started = true;
start = System.currentTimeMillis();
}
BasicTime time = (BasicTime) msg.getEntity(1);
System.out.print("time:" + time + " ");
String symbol = msg.getEntity(2).getString();
System.out.print("sym:" + symbol + " ");
Integer qty = ((BasicInt) msg.getEntity(3)).getInt();
System.out.print("qty:" + qty + " ");
Double price = ((BasicDouble) msg.getEntity(4)).getDouble();
System.out.print("price:" + price + " \n");
count.incrementAndGet();
System.out.println(count.get());
if (count.get() % 10000 == 0) {
long end = System.currentTimeMillis();
System.out.println(count + " messages took " + (end - start) + "ms, throughput: " + count.get() / ((end - start) / 1000.0) + " messages/s");
}
if (count.get() == 20000) {
System.out.println("Done");
}
}
}
public void ThreadedClient() throws SocketException {
ThreadedClient client = new ThreadedClient(subscribePORT);
try {
client.subscribe(HOST, PORT, "Trades", "", new SampleMessageHandler());
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
if (args.length == 4) {
try {
HOST = args[0];
PORT = Integer.parseInt(args[1]);
subscribePORT = Integer.parseInt(args[2]);
METHOD = args[3].charAt(0);
if (METHOD != 'p' && METHOD != 'P' && METHOD != 'T' && METHOD != 't')
throw new Exception("the 4th parameter 'subscribeMethod' must be 'P' or 'T'");
} catch (Exception e) {
System.out.println("Wrong arguments");
}
} else if (args.length != 4 && args.length != 0) {
System.out.println("wrong arguments");
return;
}
conn = new DBConnection();
try {
conn.connect(HOST, PORT);
} catch (IOException e) {
System.out.println("Connection error");
e.printStackTrace();
}
try {
new StreamingData().createStreamTable();
} catch (IOException e) {
System.out.println("Writing error");
}
conn.close();
try {
switch (METHOD) {
case 'p':
case 'P':
new StreamingData().PollingClient();
case 't':
case 'T':
new StreamingData().ThreadedClient();
}
} catch (IOException e) {
System.out.println("Subscription error");
}
}
}