Skip to content

Commit 80b9971

Browse files
committed
SocketIO行情交易网关
1 parent dcfaa53 commit 80b9971

21 files changed

Lines changed: 1051 additions & 0 deletions

languages/Server/README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
## XAPI行情交易网关接口
2+
3+
本项目使用了以下项目实现基本功能
4+
1. Flask-SocketIO
5+
2. XAPI Python版接口
6+
7+
## 安装与使用
8+
1. 解压`QuantBox统一接口完整版_v0.9.8.8.zip`
9+
2. 执行其中的`X1.复制bin和System32目录_需右键以管理员身份运行.bat`,自动将CTP等库文件安装到默认目录下
10+
3. 配置`config.py` 中的服务器地址与账号,可以根据需要配置只连接行情或交易
11+
4. 运行`python xapi_sio_server.py`即启动了服务,等待客户端连接
12+
5. 访问http://127.0.0.1:5000可以在浏览器中登录并测试下单撤单
13+
6. 如果有些库没有安装,可以参考requirements.txt使用pip进行安装,一定要安装eventlet
14+
15+
## 工作原理
16+
1. Flask Web服务等待客户端的接入后初始化CTP API,自动连接CTP柜台
17+
2. 行情调用流程:Python->XAPI_CPP_x64.dll->CTP_Quote_x64.dll->thostmduserapi.dll
18+
3. 交易调用流程:Python->XAPI_CPP_x64.dll->CTP_Trade_x64.dll->thosttradeapi.dll
19+
4. 收到订阅请求后,开始调用api,然后收到的行情转发
20+
5. 交易接口与行情接口相同
21+
22+
## 已知问题
23+
1. 单用户版,只支持一个用户
24+
2. 没有提供历史数据功能

languages/Server/__init__.py

Whitespace-only changes.

languages/Server/app/__init__.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from flask import Flask
2+
from flask_socketio import SocketIO
3+
4+
socketio = SocketIO()
5+
6+
7+
def create_app(debug=False):
8+
"""Create an application."""
9+
app = Flask(__name__)
10+
app.debug = debug
11+
app.config['SECRET_KEY'] = 'gjr39dkjn344_!67#'
12+
13+
from .main import main as main_blueprint
14+
app.register_blueprint(main_blueprint)
15+
16+
socketio.init_app(app)
17+
18+
from .api.events import XApiNamespace
19+
socketio.on_namespace(XApiNamespace('/api'))
20+
return app
21+
22+
# from app.api.api import config_md, config_td
23+
#
24+
# md, td = config_md(), config_td()
25+
#
26+
# from app.socketio_queue import EmitQueue
27+
#
28+
# mq = EmitQueue(socketio)
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from . import spi, events
2+
import config

languages/Server/app/api/events.py

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
from app import socketio
2+
from config import *
3+
from .spi import *
4+
from ..socketio_queue import EmitQueue
5+
6+
from flask_socketio import Namespace
7+
8+
import logging
9+
10+
logger = logging.getLogger("SIO_Server")
11+
12+
13+
class XApiNamespace(Namespace):
14+
md = None
15+
td = None
16+
mq = None
17+
spi = None
18+
orders_map = {}
19+
20+
def __init__(self, namespace=None):
21+
super(XApiNamespace, self).__init__(namespace)
22+
self.mq = EmitQueue(socketio)
23+
self.spi = md_spi(self.mq, self.namespace)
24+
25+
def start(self):
26+
# 有客户端连接上来时才启动
27+
# 1. 网页已经连接过一没有关,重开服务端也会导致触发
28+
# 2. 服务端已经连接成功了,但没有通知
29+
if self.md is None:
30+
self.md = config_md()
31+
if enable_md:
32+
init_md(self.md)
33+
if self.td is None:
34+
self.td = config_td()
35+
init_td(self.td)
36+
37+
def stop(self):
38+
if self.md is not None:
39+
self.md.disconnect()
40+
self.md = None
41+
if self.td is not None:
42+
self.td.disconnect()
43+
self.td = None
44+
45+
def connect(self):
46+
self.spi.set_api(self.md, self.td)
47+
self.md.register_spi(self.spi)
48+
if not self.md.is_connected():
49+
if enable_md:
50+
self.md.connect()
51+
self.td.register_spi(self.spi)
52+
if not self.td.is_connected():
53+
if enable_td:
54+
self.td.connect()
55+
56+
def on_connect(self):
57+
# 刷新网页时这里会触发两次,所以需要做到防止重连
58+
logger.info('on_connect')
59+
self.start()
60+
self.connect()
61+
self.spi.emit_is_connected()
62+
63+
def on_disconnect(self):
64+
# 得所有连接都断开才能取消订阅行情
65+
logger.info('on_disconnect')
66+
67+
def on_sub_quote(self, data):
68+
logger.info('on_sub_quote:%s', data)
69+
args = data['args']
70+
if not self.md.is_connected():
71+
return
72+
self.md.subscribe(args['instruments'], args['exchange'])
73+
74+
def on_unsub_quote(self, data):
75+
logger.info('on_unsub_quote:%s', data)
76+
args = data['args']
77+
if not self.md.is_connected():
78+
return
79+
self.md.unsubscribe(args['instruments'], args['exchange'])
80+
81+
def on_send_order(self, data):
82+
logger.info('on_send_order:%s', data)
83+
args = data['args']
84+
if not self.td.is_connected():
85+
return
86+
87+
# 默认数据,如果输入的参数不够全,使用默认参数
88+
_d0 = {
89+
"InstrumentID": "c1909",
90+
"Type": "Limit",
91+
"Side": "Buy",
92+
"Qty": 1,
93+
"Price": 100.0,
94+
"OpenClose": "Open",
95+
"HedgeFlag": "Speculation",
96+
}
97+
98+
_input = args
99+
# 使用输出的参数更新默认字典,防止下面取枚举时出错
100+
_d0.update(_input)
101+
102+
# 将原订单中的枚举字符串都换成数字
103+
_d1 = {
104+
'Type': OrderType[_d0["Type"]],
105+
'Side': OrderSide[_d0["Side"]],
106+
'OpenClose': OpenCloseType[_d0["OpenClose"]],
107+
'HedgeFlag': HedgeFlagType[_d0["HedgeFlag"]],
108+
}
109+
_d0.update(_d1)
110+
local_id = _d0['LocalID']
111+
112+
order_id = self.td.send_order(_d0)
113+
114+
# 也可以不设置,但这样远程就无法关联了
115+
if len(local_id) > 0:
116+
self.orders_map[order_id] = local_id
117+
118+
def on_cancel_order(self, data):
119+
logger.info('on_cancel_order:%s', data)
120+
args = data['args']
121+
if not self.td.is_connected():
122+
return
123+
self.td.cancel_order(args["ID"])
124+
125+
def on_query_account(self, data):
126+
logger.info('on_query_account')
127+
query = ReqQueryField()
128+
self.td.req_query(QueryType.ReqQryTradingAccount, query)
129+
130+
def on_query_positions(self, data):
131+
logger.info('on_query_positions')
132+
query = ReqQueryField()
133+
self.td.req_query(QueryType.ReqQryInvestorPosition, query)
134+
135+
def on_query_settlement_info(self, data):
136+
logger.info('on_query_settlement_info:%s', data)
137+
args = data['args']
138+
139+
query = ReqQueryField()
140+
query.DateStart = args["TradingDay"]
141+
self.td.req_query(QueryType.ReqQrySettlementInfo, query)
142+
143+
def on_query_history_data(self, data):
144+
logger.info('on_query_history_data:%s', data)
145+
args = data['args']
146+
147+
self.spi.emit_rsp_qry_history_data(args)
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import pymongo
2+
import pandas as pd
3+
from datetime import datetime
4+
5+
import logging
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
def connnect_collection():
11+
"""
12+
连接数据库并选择表
13+
:return:
14+
"""
15+
client = pymongo.MongoClient(
16+
host='192.168.15.159',
17+
port=20000,
18+
username='user-read',
19+
password='123456',
20+
authSource='market-data-v1',
21+
authMechanism='SCRAM-SHA-1')
22+
23+
db = client['market-data-v1']
24+
coll = db['KLine']
25+
return coll
26+
27+
28+
def kline_download_df(coll, code='AG1906', period=1):
29+
"""
30+
增量下载数据
31+
:param coll:
32+
:param code:
33+
:param period: 周期,数据库中目前是1min对应1
34+
:param barTime: 格式是:年月日时分秒
35+
:return:
36+
"""
37+
# 有可能放假时间比较长,所以还是反过来取感觉更快
38+
q = coll.find(filter={'code': code.upper(), 'period': period},
39+
projection={'code': 1, 'date': 1, 'time': 1,
40+
'open': 1, 'high': 1, 'low': 1, 'close': 1,
41+
'barTime': 1,
42+
'businessAmount': 1,
43+
'businessBalance': 1,
44+
'amount': 1,
45+
'tradingDay': 1,
46+
'_id': 0}).sort([('barTime', -1)]).limit(2000) # 测试时只查一部分
47+
print(datetime.now(), '查询语句准备好', code)
48+
# 由于使用了迭代器,所以只能直接list,不能先套用别的处理方法,否则丢失数据
49+
df = pd.DataFrame(list(reversed(list(q))))
50+
print(datetime.now(), '数据下载封装完成', code, len(df))
51+
if len(df) == 0:
52+
return None
53+
# 为了实现数据过滤,先不处理
54+
df['barTime'] = pd.to_datetime(df['barTime'].astype(str), format='%Y%m%d%H%M%S')
55+
df[['open', 'high', 'low', 'close']] = df[['open', 'high', 'low', 'close']] / 1000000
56+
return df
57+
58+
def kline_download_list(coll, code='AG1906', period=1):
59+
"""
60+
增量下载数据
61+
:param coll:
62+
:param code:
63+
:param period: 周期,数据库中目前是1min对应1
64+
:param barTime: 格式是:年月日时分秒
65+
:return:
66+
"""
67+
# 有可能放假时间比较长,所以还是反过来取感觉更快
68+
q = coll.find(filter={'code': code.upper(), 'period': period},
69+
projection={'code': 1, 'date': 1, 'time': 1,
70+
'open': 1, 'high': 1, 'low': 1, 'close': 1,
71+
'barTime': 1,
72+
'businessAmount': 1,
73+
'businessBalance': 1,
74+
'amount': 1,
75+
'tradingDay': 1,
76+
'_id': 0}).sort([('barTime', -1)]).limit(2000) # 测试时只查一部分
77+
logger.info('查询准备:%s', code)
78+
l = list(q)
79+
logger.info('数据下载封装完成:%s,%d', code, len(l))
80+
return list(reversed(l))
81+
82+
83+
if __name__ == "__main__":
84+
coll = connnect_collection()
85+
df = kline_download_df(coll)
86+
print(df)

0 commit comments

Comments
 (0)