forked from jasonweiyi/xapi_python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlts_market_tests.py
More file actions
104 lines (84 loc) · 4.06 KB
/
lts_market_tests.py
File metadata and controls
104 lines (84 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# -*- coding: utf-8 -*-
import unittest
import time
from base.callbacks import CallBacks
from base.comm import get_depth_market_data, get_bid_count, get_bid, get_all_asks, Disconnected
from logger.file_logger import FileLogger
from xapi import XApi, ServerInfoField, UserInfoField
class LtsMarketCallbacks(CallBacks):
market_data = {}
def on_market_connected(self, p_api, rsp_user_login, status):
print "Status: %s" % ord(status)
if status == Disconnected.value:
print rsp_user_login.ErrorMsg.decode('gbk')
def on_market_rtn_depth_market_data_n(self, p_api, p_depth_market_data_n):
depth_market_data = get_depth_market_data(p_depth_market_data_n)
if not self.market_data.get(str(depth_market_data.InstrumentID)):
self.market_data[str(depth_market_data.InstrumentID)] = []
self.market_data[str(depth_market_data.InstrumentID)].append(depth_market_data)
print(str(depth_market_data.InstrumentID) + " " + str(depth_market_data.UpdateTime))
# 循环获取各买档
bid_count = get_bid_count(p_depth_market_data_n)
print u"买档:%s" % bid_count
for i in range(1, bid_count + 1):
dp = get_bid(p_depth_market_data_n, i)
print u"买%s:%s %s" % (i, dp.Price, dp.Size)
# 一次获取所有卖档数据
asks = get_all_asks(p_depth_market_data_n)
print u"卖档:%s" % len(asks)
for i in range(0, len(asks)):
dp = asks[i]
print u"卖%s:%s %s" % (i + 1, dp.Price, dp.Size)
def on_market_rsp_error(self, p_api, rsp_info, b_is_last):
print "Error: %s" % rsp_info.ErrorMsg
class LtsMarketTests(unittest.TestCase):
lts = None
instrument_ids = [b"600109", b'600601', b'600268', b'600818', b'603997', b'600234']
exchange_id = b"SSE"
def setUp(self):
self.callbacks = [LtsMarketCallbacks(), FileLogger(b"C:/tmp/log")]
self.lts = XApi("QuantBox_LTS_Quote.dll")
self.lts.debug = True
if not self.lts.init():
print self.lts.get_last_error()
else:
server = ServerInfoField()
server.BrokerID = b"2011"
server.Address = b"tcp://211.144.195.163:44513"
server.UserProductInfo = b""
server.AuthCode = b""
user = UserInfoField()
user.UserID = b"020000000352"
user.Password = b"123"
self.lts.connect(b"c:\\tmp\\lts", server, user)
self.lts.set_callbacks(self.callbacks)
def tearDown(self):
self.lts.disconnect()
self.callbacks[1].flush()
# 处理消息队列
def process(self, max_wait=60, stop_condition=lambda: False):
itr = 0
while itr < max_wait and not stop_condition():
itr += 1
time.sleep(1)
def test_get_api_name(self):
self.assertEqual(self.lts.get_api_name(), 'LTS')
def test_connect(self):
self.process(5, lambda: self.lts.is_connected)
self.assertTrue(self.lts.is_connected, msg='Login Failed!')
def test_depth_market_data_single(self):
self.test_connect()
self.lts.subscribe(instrument_ids=self.instrument_ids[0], exchange_id=self.exchange_id)
self.process(10, stop_condition=lambda: len(self.callbacks[0].market_data.get(self.instrument_ids[0], [])) > 10)
self.lts.unsubscribe(self.instrument_ids[0])
self.assertNotEqual(len(self.callbacks[0].market_data.get(self.instrument_ids[0], [])), 0, msg='Get Depth Market Data Failed!')
def test_depth_market_data_multi(self):
self.test_connect()
self.lts.subscribe(instrument_ids=self.instrument_ids[:5], exchange_id=self.exchange_id)
self.process(stop_condition=lambda: len(self.callbacks[0].market_data) > 10)
self.lts.unsubscribe(self.instrument_ids[:5])
for ins in self.instrument_ids[:5]:
self.assertNotEqual(len(self.callbacks[0].market_data.get(ins, [])), 0, msg='Get Depth Market Data for %s Failed!' % ins)
if __name__ == '__main__':
unittest.main()