DolphinDB Python API supports Python 3.6 - 3.8.
本教程目录如下:
- 1. Execute DolphinDB script and call DolphinDB functions
- 2. Upload Python objects to DolphinDB server
- 3. Import data into DolphinDB databases
- 4. Load data from DolphinDB databases
- 5. Append data to DolphinDB tables
- 6. Database and table operations
- 7. SQL queries
- 8. Python Streaming API
- 9. 更多实例
DolphinDB Python API in essense encapsulates a subset of DolphinDB's scripting language. It converts Python script to DolphinDB script to be executed on the DolphinDB server. The result can either be saved on DolphinDB server or be serialized to a Python client object.
The examples in this tutorial use a csv file: example.csv.
Python interacts with DolphinDB through a Session object. The most commonly used Session class methods are as follows:
| Method | Explanation |
|---|---|
| connect(host,port,[username,password]) | Connect a session to DolphinDB server |
| login(username,password,[enableEncryption]) | log in DolphinDB server |
| run(script) | Execute script on DolphinDB server |
| run(functionName,args) | Call functions on DolphinDB server |
| upload(DictionaryOfPythonObjects) | Upload Python objects to DolphinDB server |
| undef(objName,objType) | Undefine an object in DolphinDB to release memory |
| undefAll() | Undefine all objects in DolphinDB to release memory |
| close() | Close the session |
In the following script, we first create a session in Python, then connect the session to a DolphinDB server with specified domain name/IP address and port number. We need to start a DolphinDB server before running the following Python script.
import dolphindb as ddb
s = ddb.session()
s.connect("localhost", 8848)
# output
TrueUse the following script to connect to DolphinDB server with username and password:
s.connect("localhost", 8848, YOUR_USER_NAME, YOUR_PASS_WORD)or
s.connect("localhost", 8848)
s.login(YOUR_USER_NAME,YOUR_PASS_WORD)If a session was initialized without username and password, we can use the method login to log in DolphinDB server. The default username is 'admin' and the default password is '123456'. By default, the user name and password are encrypted during connection.
All DolphinDB script can be executed through the run(script) method. If the script returns an object in DolphinDB, it will be converted to an object in Python. If the script fails to run, there will be a corresponding error prompt.
a=s.run("`IBM`GOOG`YHOO")
repr(a)
# output
"array(['IBM', 'GOOG', 'YHOO'], dtype='<U4')"User-defined functions can be generated with the run method:
s.run("def getTypeStr(input){ \nreturn typestr(input)\n}")The maximum length of the text in the run method is 65,535 bytes. For multiple lines of script, we can wrap them inside triple quotes for clarity. For example:
script="""
def getTypeStr(input){
return typestr(input)
}
"""
s.run(script)
s.run("getTypeStr", 1);
# output
'LONG'In addition to executing script, the run method can directly call DolphinDB built-in or user-defined functions on a remote DolphinDB server. For this usage, the first parameter of the run method is the function name and the subsequent parameters are the parameters of the function.
The following example shows a Python program calling DolphinDB built-in function add through method run. The add function has 2 parameters: x and y. Depending on whether the values of the parameters have been assigned at DolphinDB server, there are 3 ways to call the function:
- Both parameters have been assigned value at DolphinDB server
If both x and y have been assigned value at DolphinDB server in the Python program,
s.run("x = [1,3,5];y = [2,4,6]")then just use run(script):
a=s.run("add(x,y)")
repr(a)
# output
'array([3, 7, 11], dtype=int32)'- Only one parameter has been assigned value at DolphinDB server
If only x has been assigned value at DolphinDB server in the Python program
s.run("x = [1,3,5]")and y is to be assigned value when calling add, we need to use Partial Application to fix parameter x to function add.
import numpy as np
y=np.array([1,2,3])
result=s.run("add{x,}", y)
repr(result)
# output
'array([2,5,8])'
result.dtype
# output
dtype('int64')- Both parameters are to be assigned value
import numpy as np
x=np.array([1.5,2.5,7])
y=np.array([8.5,7.5,3])
result=s.run("add", x, y)
repr(result)
# output
'array([10., 10., 10.])'
result.dtype
# output
dtype('float64')When calling DolphinDB's built-in functions through run, the parameters uploaded can be scalar, list, dict, numpy objects, pandas DataFrame and Series, etc.
Note:
- NumPy arrays can only be 1D or 2D.
- If a pandas DataFrame or Series object has an index, the index will be lost after the object is uploaded to DolphinDB. To keep the index, use the pandas DataFrame function
reset_index.- If a parameter of a DolphinDB function is of temporal type, it should be converted to numpy.datetime64 type before uploading.
The following examples introduce the use of various types of Python objects as parameters.
-
list objects
Add 2 Python lists with DolphinDB function
add:s.run("add",[1,2,3,4],[1,2,1,1]) # output array([2, 4, 4, 5])
-
NumPy objects
-
np.int
import numpy as np s.run("add{1,}",np.int(4)) # output 5
-
np.datetime64
Python API将datetime64格式的数据转换成DolphinDB中对应的时间数据类型。对应关系如下表。
DolphinDB Type datetime64 DATE '2019-01-01' MONTH '2019-01' DATETIME '2019-01-01T20:01:01' TIMESTAMP '2019-01-01T20:01:01.122' NANOTIMESTAMP '2019-01-01T20:01:01.122346100' import numpy as np s.run("typestr",np.datetime64('2019-01-01')) # output 'DATE' s.run("typestr",np.datetime64('2019-01')) # output 'MONTH' s.run("typestr",np.datetime64('2019-01-01T20:01:01')) # output 'DATETIME' s.run("typestr",np.datetime64('2019-01-01T20:01:01.122')) # output 'TIMESTAMP' s.run("typestr",np.datetime64('2019-01-01T20:01:01.1223461')) # output 'NANOTIMESTAMP'
As TIME, MINUTE, SECOND and NANOTIME types in DolphinDB don't have information about dates, datetime64 type cannot be converted into these types directly in Python API. To generate these data types in DolphinDB from Python, we can upload the datetime64 type to DolphinDB server and then get rid of the date information.
import numpy as np ts = np.datetime64('2019-01-01T20:01:01.1223461') s.upload({'ts':ts}) s.run('a=nanotime(ts)') s.run('typestr(a)') # output 'NANOTIME' s.run('a') # output numpy.datetime64('1970-01-01T20:01:01.122346100')
Please note that in the last step of the example above, when the NANOTIME type in DolphinDB is downloaded to Python, Python will automatically add 1970-01-01 as the date part.
-
list of np.datetime64 objects
import numpy as np a=[np.datetime64('2019-01-01T20:00:00.000000001'), np.datetime64('2019-01-01T20:00:00.000000001')] s.run("add{1,}",a) # output array(['2019-01-01T20:00:00.000000002', '2019-01-01T20:00:00.000000002'], dtype='datetime64[ns]')
-
-
pandas objects
If a pandas DataFrame or Series object has an index, the index will be lost after the object is uploaded to DolphinDB.
-
Series
import pandas as pd import numpy as np a = pd.Series([1,2,3,1,5],index=np.arange(1,6,1)) s.run("add{1,}",a) # output array([2, 3, 4, 2, 6])
-
DataFrame
import pandas as pd import numpy as np a=pd.DataFrame({'id': np.int32([1, 2, 3, 4, 3]), 'value': np.double([7.8, 4.6, 5.1, 9.6, 0.1]), 'x': np.int32([5, 4, 3, 2, 1])}, index={'one','two','three','four','five'}) s.upload({'a':a}) s.run("typestr",a) # output 'IN-MEMORY TABLE' s.run('a') # output id value x 0 1 7.8 5 1 2 4.6 4 2 3 5.1 3 3 4 9.6 2 4 3 0.1 1
-
If we need to use a local object repeatedly in DolphinDB, we can upload the local object to the DolphinDB server. When uploading, we need to specify the variable name.
The Python API provides method upload to upload Python objects to the DolphinDB server. The input of the method upload is a Python dictionary object. The key of the dictionary is the variable name in DolphinDB, and the value is a Python object, which can be Numbers, Strings, Lists, DataFrame, etc.
- Upload Python list
a = [1,2,3.0]
s.upload({'a':a})
a_new = s.run("a")
print(a_new)
# output
[1. 2. 3.]
a_type = s.run("typestr(a)")
print(a_type)
# output
ANY VECTORNote that a Python list with different data types such as a=[1,2,3.0] will be recognized as an ANY vector after being uploaded to DolphinDB. For such cases, it is recommended to use numpy.array instead of list. With numpy.array, we can specify a single data type through a=numpy.array([1,2,3.0],dtype=numpy.double), so that after uploading, "a" is a vector of DOUBLE type.
- Upload NumPy array
import numpy as np
arr = np.array([1,2,3.0],dtype=np.double)
s.upload({'arr':arr})
arr_new = s.run("arr")
print(arr_new)
# output
[1. 2. 3.]
arr_type = s.run("typestr(arr)")
print(arr_type)
# output
FAST DOUBLE VECTOR- Upload pandas DataFrame
import pandas as pd
import numpy as np
df = pd.DataFrame({'id': np.int32([1, 2, 3, 4, 3]), 'value': np.double([7.8, 4.6, 5.1, 9.6, 0.1]), 'x': np.int32([5, 4, 3, 2, 1])})
s.upload({'t1': df})
print(s.run("t1.value.avg()"))
# output
5.44In Python, we can use the method table to create a DolphinDB table object and upload it to the server. The input of the method table can be a dictionary, DataFrame or table name in DolphinDB.
- Upload dict
The script below defines a function createDemoDict() to create a dictionary.
import numpy as np
def createDemoDict():
return {'id': [1, 2, 2, 3],
'date': np.array(['2019-02-04', '2019-02-05', '2019-02-09', '2019-02-13'], dtype='datetime64[D]'),
'ticker': ['AAPL', 'AMZN', 'AMZN', 'A'],
'price': [22, 3.5, 21, 26]}Upload the dictionary to the DolphinDB server with the method table, and name the table as "testDict", then we can read the table with the method loadTable provided by the API.
import numpy as np
# save the table to DolphinDB server as table "testDict"
dt = s.table(data=createDemoDict(), tableAliasName="testDict")
# load table "testDict" on DolphinDB server
print(s.loadTable("testDict").toDF())
# output
id date ticker price
0 1 2019-02-04 AAPL 22.0
1 2 2019-02-05 AMZN 3.5
2 2 2019-02-09 AMZN 21.0
3 3 2019-02-13 A 26.0- Upload pandas DataFrame
The script below defines function createDemoDataFrame() to create a pandas DataFrame.
import pandas as pd
def createDemoDataFrame():
data = {'cid': np.array([1, 2, 3], dtype=np.int32),
'cbool': np.array([True, False, np.nan], dtype=np.bool),
'cchar': np.array([1, 2, 3], dtype=np.int8),
'cshort': np.array([1, 2, 3], dtype=np.int16),
'cint': np.array([1, 2, 3], dtype=np.int32),
'clong': np.array([0, 1, 2], dtype=np.int64),
'cdate': np.array(['2019-02-04', '2019-02-05', ''], dtype='datetime64[D]'),
'cmonth': np.array(['2019-01', '2019-02', ''], dtype='datetime64[M]'),
'ctime': np.array(['2019-01-01 15:00:00.706', '2019-01-01 15:30:00.706', ''], dtype='datetime64[ms]'),
'cminute': np.array(['2019-01-01 15:25', '2019-01-01 15:30', ''], dtype='datetime64[m]'),
'csecond': np.array(['2019-01-01 15:00:30', '2019-01-01 15:30:33', ''], dtype='datetime64[s]'),
'cdatetime': np.array(['2019-01-01 15:00:30', '2019-01-02 15:30:33', ''], dtype='datetime64[s]'),
'ctimestamp': np.array(['2019-01-01 15:00:00.706', '2019-01-01 15:30:00.706', ''], dtype='datetime64[ms]'),
'cnanotime': np.array(['2019-01-01 15:00:00.80706', '2019-01-01 15:30:00.80706', ''], dtype='datetime64[ns]'),
'cnanotimestamp': np.array(['2019-01-01 15:00:00.80706', '2019-01-01 15:30:00.80706', ''], dtype='datetime64[ns]'),
'cfloat': np.array([2.1, 2.658956, np.NaN], dtype=np.float32),
'cdouble': np.array([0., 47.456213, np.NaN], dtype=np.float64),
'csymbol': np.array(['A', 'B', '']),
'cstring': np.array(['abc', 'def', ''])}
return pd.DataFrame(data)Upload the DataFrame to DolphinDB server with method table, name it as "testDataFrame", then we can read the table with method loadTable provided by the API.
import pandas as pd
# save the table to DolphinDB server as table "testDataFrame"
dt = s.table(data=createDemoDataFrame(), tableAliasName="testDataFrame")
# load table "testDataFrame" on DolphinDB server
print(s.loadTable("testDataFrame").toDF())
# output
>>> print(s.loadTable("testDataFrame").toDF())
cid cbool cchar cshort cint ... cnanotimestamp cfloat cdouble csymbol cstring
0 1 True 1 1 1 ... 2019-01-01 15:00:00.807060 2.100000 0.000000 A abc
1 2 False 2 2 2 ... 2019-01-01 15:30:00.807060 2.658956 47.456213 B def
2 3 True 3 3 3 ... NaT NaN NaNtable和loadTable函数必须返回一个Python本地变量,即Python端的变量和server端有一个一一对应的关系。假设server端表对象为t0, 释放server端对象有三种方法:取消server端定义(undef),将server端对象置空,或取消本地变量对server端对象的引用。
t0=s.table(data=createDemoDict(), tableAliasName="t1")
s.undef("t0", "VAR")
or
s.run("t0=NULL")
or
t0=None
当Python端通过session.table函数将数据上传到server之后, DolphinDB会为Python端的变量建立一个变量对server端table变量的引用。当Python端对server端table变量引用消失后,server端的table会自动释放。
下面代码显示将一个表上传到server,然后通过toDF()加载来拿到数据。
t1=s.table(data=createDemoDict(), tableAliasName="t1")
print(t1.toDF())
#output
id date ticker price
0 1 2019-02-04 AAPL 22.0
1 2 2019-02-05 AMZN 3.5
2 2 2019-02-09 AMZN 21.0
3 3 2019-02-13 A 26.0
如果重复下面这个语句,就会发生找到不到t1的异常。原因是Python端对Server端表t1的原有引用已经取消,在重新给Python端t1分配DolphinDB的表对象前,
DolphinDB要对session中的对应的表t1进行释放(通过函数undef取消它在session中的定义),所以会出现无法找到t1的异常。
t1=s.table(data=createDemoDict(), tableAliasName="t1")
print(t1.toDF())
#output
<Server Exception> in run: Can't find the object with name t1
那么如何避免这种情况呢?将这个table对象赋值给另一个Python本地变量就不会出现找不到t1的情况。但这里的代价是server端保存了两份同样的table对象,因为Python端有两个引用:t1和t2。
t2=s.table(data=createDemoDict(), tableAliasName="t1")
print(t2.toDF())
#output
id date ticker price
0 1 2019-02-04 AAPL 22.0
1 2 2019-02-05 AMZN 3.5
2 2 2019-02-09 AMZN 21.0
3 3 2019-02-13 A 26.0
如果需要反复通过同一个本地变量指向相同的或者不同的上传表,更合理的方法是不指定表名。此时会为用户随机产生一个临时表名。这个表名可以通过t1.tableName()来获取。这里可能会产生一个疑惑,那么server端是不是会产生很多表对象,造成内存溢出。由于python端使用了同一个变量名, 所以在重新上传数据的时候,系统会将上一个表对象释放掉(TMP_TBL_876e0ce5),而用一个新的table对象TMP_TBL_4c5647af来对应Python端的t1,所以Server端始终只有一个对应的表对象。
t1=s.table(data=createDemoDicts())
print(t1.tableName())
#output
TMP_TBL_876e0ce5
print(t1.toDF())
#output
id date ticker price
0 1 2019-02-04 AAPL 22.0
1 2 2019-02-05 AMZN 3.5
2 2 2019-02-09 AMZN 21.0
3 3 2019-02-13 A 26.0
t1=s.table(data=createDemoDict())
print(t1.tableName())
#output
'TMP_TBL_4c5647af'
print(t1.toDF())
#output
id date ticker price
0 1 2019-02-04 AAPL 22.0
1 2 2019-02-05 AMZN 3.5
2 2 2019-02-09 AMZN 21.0
3 3 2019-02-13 A 26.0
同理,通过loadTable来加载一个磁盘分区表到内存的原理也是必须赋值给一个Python本地变量,建立起Python本地变量和server端一一对应的关系。 首先运行一下DolphinDB脚本:
db = database("dfs://testdb",RANGE, [1, 5 ,11])
t1=table(1..10 as id, 1..10 as v)
db.createPartitionedTable(t1,`t1,`id).append!(t1)
然后运行以下Python脚本:
pt1=s.loadTable(tableName='t1',dbPath="dfs://testdb")
上面脚本即是在server端创建了一个磁盘分区表,然后通过session函数loadTable来讲该表导入内存,并将该表对象赴给本地变量pt1。注意到这里t1并不是server端表对象的名字,
而是磁盘分区表的名字,是用于讲数据库testdb中,讲分区表t1加载到内存中的。实际的表对象的名字,需要通过 pt1.tableName()来得到。
print(pt1.tableName())
'TMP_TBL_4c5647af'
如果一个表对象只是一次性使用,尽量不要使用上传机制。直接通过函数调用来完成,表对象作为函数的一个参数。函数调用不会缓存数据,函数调用结束,所有数据都释放,没有副作用,而且只有一次网络传输,降低网络延迟。
df = pd.DataFrame({'id': np.int32([1, 2, 3, 4, 3]), 'value': np.double([7.8, 4.6, 5.1, 9.6, 0.1]), 'x': np.int32([5, 4, 3, 2, 1])})
s.upload({'t1': df})
print(s.run("t1.value.avg()"))
# output
5.44
A DolphinDB table object can be created with the table method of a session. The input of the table method can be a dictionary, a DataFrame, or a table name on the DolphinDB server.
# save the table to DolphinDB server as table "test"
dt = s.table(data={'id': [1, 2, 2, 3],
'ticker': ['AAPL', 'AMZN', 'AMZN', 'A'],
'price': [22, 3.5, 21, 26]}).executeAs("test")
# load table "test" on DolphinDB server
print(s.loadTable("test").toDF())
# output
id price sym
0 1 22.0 AAPL
1 2 3.5 AMZN
2 2 21.0 AMZN
3 3 26.0 A
There are 3 types of DolphinDB databases: in-memory database, local file system database and DFS (Distributed File System) database. DFS automatically manages data storage and replicas. DolphinDB is a distributed database system and achieves optimal performance in DFS mode. Therefore, we highly recommend to use DFS mode. Please refer to the tutorial multi_machine_cluster_deploy for details. For simplicity, we also use databases in the local file system in some examples.
To import text files into DolphinDB as an in-memory table, use session method loadText. It returns a DolphinDB table object in Python, which corresponds to an in-memory table on the DolphinDB server. The DolphinDB table object in Python has a method toDF to convert it to a pandas DataFrame.
Please note that to use method loadText to load a text file as an in-memory table, table size must be smaller than available memory.
WORK_DIR = "C:/DolphinDB/Data"
# return a DolphinDB table object in Python
trade=s.loadText(WORK_DIR+"/example.csv")
# convert the imported DolphinDB table object into a pandas DataFrame
df = trade.toDF()
print(df)
# output
TICKER date VOL PRC BID ASK
0 AMZN 1997.05.16 6029815 23.50000 23.50000 23.6250
1 AMZN 1997.05.17 1232226 20.75000 20.50000 21.0000
2 AMZN 1997.05.20 512070 20.50000 20.50000 20.6250
3 AMZN 1997.05.21 456357 19.62500 19.62500 19.7500
4 AMZN 1997.05.22 1577414 17.12500 17.12500 17.2500
5 AMZN 1997.05.23 983855 16.75000 16.62500 16.7500
...
13134 NFLX 2016.12.29 3444729 125.33000 125.31000 125.3300
13135 NFLX 2016.12.30 4455012 123.80000 123.80000 123.8300
The default delimiter for function loadText is comma ",". We can also use other delimiters. For example, to import a tabular text file:
t1=s.loadText(WORK_DIR+"/t1.tsv", '\t')
To load data files that are larger than available memory into DolphinDB, we can load data into a partitioned database.
After we create a partitioned database, we cannot modify its partitioning scheme. To make sure the examples below do not use a pre-defined database "valuedb", check if it exists. If it exists, delete it.
if s.existsDatabase(WORK_DIR+"/valuedb"):
s.dropDatabase(WORK_DIR+"/valuedb")
Now create a value-based partitioned database "valuedb" with a session method database. As "example.csv" only has data for 3 stocks, we use a VALUE partition with stock ticker as the partitioning column. The parameter "partitions" indicates the partition scheme.
# 'db' indicates the database handle name on the DolphinDB server.
s.database('db', partitionType=ddb.VALUE, partitions=["AMZN","NFLX","NVDA"], dbPath=WORK_DIR+"/valuedb")
# this is equivalent to executing 'db=database(=WORK_DIR+"/valuedb", VALUE, ["AMZN","NFLX", "NVDA"])' on DolphinDB server.
To create a partitioned database in DFS, just make the database path start with "dfs://". Before we execute the following script, we need to configure a DFS cluster. Please refer to the tutorial multi_machine_cluster_deploy.md
s.database('db', partitionType=ddb.VALUE, partitions=["AMZN","NFLX", "NVDA"], dbPath="dfs://valuedb")
In addition to VALUE partition, DolphinDB also supports SEQ, RANGE, LIST, COMBO, and HASH partitions.
After a partitioned database is created successfully, we can import text files to a partitioned table in the partitioned database with function loadTextEx. If the partitioned table does not exist, loadTextEx creates it and appends the imported data to it. Otherwise, the function appends the imported data to the partitioned table.
In function loadTextEx, parameter "dbPath" is the database path; "tableName" is the partitioned table name; "partitionColumns" is the partitioning columns; "filePath" is the absolute path of the text file; "delimiter" is the delimiter of the text file (comma by default).
In the following example, function loadTextEx creates a partitioned table "trade" on the DolphinDB server and then appends the data from "example.csv" to the table.
if s.existsDatabase(WORK_DIR+"/valuedb"):
s.dropDatabase(WORK_DIR+"/valuedb")
s.database('db', partitionType=ddb.VALUE, partitions=["AMZN","NFLX", "NVDA"], dbPath=WORK_DIR+"/valuedb")
trade = s.loadTextEx("db", tableName='trade',partitionColumns=["TICKER"], filePath=WORK_DIR + "/example.csv")
print(trade.toDF())
# output
TICKER date VOL PRC BID ASK
0 AMZN 1997.05.16 6029815 23.50000 23.50000 23.6250
1 AMZN 1997.05.17 1232226 20.75000 20.50000 21.0000
2 AMZN 1997.05.20 512070 20.50000 20.50000 20.6250
3 AMZN 1997.05.21 456357 19.62500 19.62500 19.7500
4 AMZN 1997.05.22 1577414 17.12500 17.12500 17.2500
5 AMZN 1997.05.23 983855 16.75000 16.62500 16.7500
...
13134 NFLX 2016.12.29 3444729 125.33000 125.31000 125.3300
13135 NFLX 2016.12.30 4455012 123.80000 123.80000 123.8300
[13136 rows x 6 columns]
# show the number of rows in the table
print(trade.rows)
13136
# show the number of columns in the table
print(trade.cols)
6
# show the schema of the table
print(trade.schema)
name typeString typeInt
0 TICKER SYMBOL 17
1 date DATE 6
2 VOL INT 4
3 PRC DOUBLE 16
4 BID DOUBLE 16
5 ASK DOUBLE 16
To refer to the table later:
trade = s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb")
We can import data as an in-memory partitioned table. Operations on an in-memory partitioned table are faster than those on a nonpartitioned in-memory table as the former utilizes parallel computing.
We can use function loadTextEx to create an in-memory partitioned database with an empty string for the parameter "dbPath".
s.database('db', partitionType=ddb.VALUE, partitions=["AMZN","NFLX","NVDA"], dbPath="")
# "dbPath='db'" means that the system uses database handle 'db' to import data into in-memory partitioned table trade
trade=s.loadTextEx(dbPath="db", partitionColumns=["TICKER"], tableName='trade', filePath=WORK_DIR + "/example.csv")
Function ploadText loads a text file in parallel to generate an in-memory partitioned table. It runs much faster than loadText.
trade=s.ploadText(WORK_DIR+"/example.csv")
print(trade.rows)
# output
13136
We can upload a Python object to the DolphinDB server with function upload. The input of function upload is a Python dictionary object. For this dictionary object, the key is the variable name in DolphinDB and the value is the Python object.
df = pd.DataFrame({'id': np.int32([1, 2, 3, 4, 3]), 'value': np.double([7.8, 4.6, 5.1, 9.6, 0.1]), 'x': np.int32([5, 4, 3, 2, 1])})
s.upload({'t1': df})
print(s.run("t1.value.avg()"))
# output
5.44
A DolphinDB table object can be created with the table method of a session. The input of the table method can be a dictionary, a DataFrame, or a table name on the DolphinDB server.
# save the table to DolphinDB server as table "test"
dt = s.table(data={'id': [1, 2, 2, 3],
'ticker': ['AAPL', 'AMZN', 'AMZN', 'A'],
'price': [22, 3.5, 21, 26]}).executeAs("test")
# load table "test" on DolphinDB server
print(s.loadTable("test").toDF())
# output
id price sym
0 1 22.0 AAPL
1 2 3.5 AMZN
2 2 21.0 AMZN
3 3 26.0 A
When we upload data to DolphinDB server, certain basic Python types such as bool, int64 and float64 are automatically converted into corresponding DolphinDB types BOOL, INT, DOUBLE. Temporal data types, however, need special treatment. DolphinDB provides 9 temporal data types: DATE, MONTH, TIME, MINUTE, SECOND, DATETIME, TIMESTAMP, NANOTIME and NANOTIMESTAMP. The temporal data type datetime64 in Python is converted into DolphinDB temporal data type NANOTIMESTAMP. To convert datetime64 into other DolphinDB temporal data types, please use from_time, from_date or from_datetime function. For more details, please refer to the following table.
# import DolphinDB data type package
from dolphindb.type_util import *
| DolphinDB Temporal Data Type | Example | Result |
|---|---|---|
| DATE | Date.from_date(date(2012,12,20)) | 2012.12.20 |
| MONTH | Month.from_date(date(2012,12,26)) | 2012.12M |
| TIME | Time.from_time(time(12,30,30,8)) | 12:30:30.008 |
| MINUTE | Minute.from_time(time(12,30)) | 12:30m |
| SECOND | Second.from_time(time(12,30,30)) | 12:30:30 |
| DATETIME | Datetime.from_datetime(datetime(2012,12,30,15,12,30)) | 2012.12.30 15:12:30 |
| TIMESTAMP | Timestamp.from_datetime(datetime(2012,12,30,15,12,30,8)) | 2012.12.30 15:12:30.008 |
| NANOTIME | NanoTime.from_time(time(13,30,10,706)) | 13:30:10.000706000 |
| NANOTIMESTAMP | NanoTimestamp.from_datetime(datetime(2012,12,24,13,30,10,80706)) | 2012.12.24 13:30:10.080706000 |
As np.NaN is of float type in Python, when np.NaN is uploaded to DolphinDB server it will be converted into FLOAT. Python API provides special Null corresponding to DolphinDB data types. The following table shows how to create DolphinDB Null in Python:
| DolphinDB Data Type | Corresponding Null in Python |
|---|---|
| BOOL | boolNan |
| CHAR | byteBan |
| SHORT | shortNan |
| INT | intNan |
| DATE | Date.null() |
| MONTH | Month.null() |
| TIME | Time.null() |
| SECOND | Second.null() |
| DATETIME | Datetime.null() |
| TIMESTAMP | Timestamp.null() |
| NANOTIME | NanoTime.null() |
| NANOTIMESTAMP | NanoTimestamp.null() |
Please note that, Python data type and DolphinDB data type can't in the same column of dictionary or dataframe. For example, 'date':[date(2012,12,30),Date.from_date(date(2012,12,31)),Date.null()]. It will raise exception since 'date' contains Python datetime64 and DolphinDB DATE.
To load a table from a database, use function loadTable. Parameter "tableName" indicates the partitioned table name; "dbPath" is the database location. If "dbPath" is not specified, loadTable will load a DolphinDB table in memory whose name is specified in argument "tableName".
For a partitioned table: if memoryMode=True and parameter "partition" is unspecified, load all data of the table into DolphinDB server memory as a partitioned table; if memoryMode=True and parameter "partition" is specified, load only the specified partitions of the table into DolphinDB server memory as a partitioned table; if memoryMode=false, only load the metadata of the table into DolphinDB server memory.
trade = s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb")
trade.toDF()
# output
TICKER date VOL PRC BID ASK
0 AMZN 1997.05.16 6029815 23.50000 23.50000 23.6250
1 AMZN 1997.05.17 1232226 20.75000 20.50000 21.0000
2 AMZN 1997.05.20 512070 20.50000 20.50000 20.6250
3 AMZN 1997.05.21 456357 19.62500 19.62500 19.7500
4 AMZN 1997.05.22 1577414 17.12500 17.12500 17.2500
5 AMZN 1997.05.23 983855 16.75000 16.62500 16.7500
...
13134 NFLX 2016.12.29 3444729 125.33000 125.31000 125.3300
13135 NFLX 2016.12.30 4455012 123.80000 123.80000 123.8300
[13136 rows x 6 columns]
To load only the "AMZN" partition:
trade = s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb", partitions="AMZN")
print(trade.rows)
# output
4941
trade = s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb", partitions=["NFLX","NVDA"], memoryMode=True)
print(trade.rows)
# output
8195
Method loadTableBySQL imports only the rows of an on-disk partitioned table that satisfy the filtering conditions in a SQL query as an in-memory partitioned table.
import os
if s.existsDatabase(WORK_DIR+"/valuedb" or os.path.exists(WORK_DIR+"/valuedb")):
s.dropDatabase(WORK_DIR+"/valuedb")
s.database(dbName='db', partitionType=ddb.VALUE, partitions=["AMZN","NFLX", "NVDA"], dbPath=WORK_DIR+"/valuedb")
t = s.loadTextEx("db", tableName='trade',partitionColumns=["TICKER"], filePath=WORK_DIR + "/example.csv")
trade = s.loadTableBySQL(tableName="trade", dbPath=WORK_DIR+"/valuedb", sql="select * from trade where date>2010.01.01")
print(trade.rows)
# output
5286
When we load data from DolphinDB, DolphinDB types BOOL and INT are converted into Python types bool and int64; CHAR, SHORT, INT and LONG are converted into int64. To convert integer into charater, use chr(). DOUBLE and FLOAT are converted into float64; SYMBOL and STRING are converted into object. Temporal data types are converted into datetime64. For example, 2012.06M is converted into 2012-06-01, 13:30m is converted into 1970-01-01 13:30:00.
| DolphinDB Data Type | Python Data Type |
|---|---|
| BOOL | bool |
| CHAR, SHORT, INT, LONG | int64 |
| DOUBLE, FLOAT | float64 |
| SYMBOL, STRING | object |
| DATE, MONTH, TIME, MINUTE, SECOND, DATETIME, TIMESTAMP, NANOTIME, NANOTIMESTAMP | datetime64 |
//DolphinDB script:
db=database(WORK_DIR+"/testPython")
t1 = table(10000:0,`cid`cbool`cchar`cshort`cint`clong`cdate`cmonth`ctime`cminute`csecond`cdatetime`ctimestamp`cnanotime`cnanotimestamp`cfloat`cdouble`csymbol`cstring,[INT,BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,SYMBOL,STRING])
insert into t1 values (1,true,'a',122h,21,22l,2012.06.12,2012.06M,13:10:10.008,13:30m,13:30:10,2012.06.13 13:30:10,2012.06.13 13:30:10.008,13:30:10.008007006,2012.06.13 13:30:10.008007006,2.1f,2.1,"ABC","abc")
insert into t1 values (2,,,,,,,,,,,,,,,,,"","")
insert into t1 values (3,bool(),char(),short(),int(),long(),date(),month(),time(),minute(),second(),datetime(),timestamp(),nanotime(),nanotimestamp(),float(),double(),"",string())
saveTable(db,t1,`t1)
t=s.loadTable(WORK_DIR+"/testPython",tableName="t1")
print(t.toDF())
| cid | cbool | cchar | cshort | cint | clong | cdate | cmonth | ctime | cminute | csecond | cdatetime | ctimestamp | cnanotime | cnanotimestamp | cfloat | cdouble | csymbol | cstring | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 1 | True | 97 | 122 | 21 | 22 | 2012-06-12 | 2012-06-01 | 1970-01-01 13:10:10.008 | 1970-01-01 13:30:00 | 1970-01-01 13:30:10 | 2012-06-13 13:30:10 | 2012-06-13 13:30:10.000008 | 1970-01-01 13:30:10.008007006 | 2012-06-13 13:30:10.008007006 | 2.1 | 2.1 | ABC | abc |
| 1 | 2 | nan | nan | nan | nan | nan | NaT | NaT | NaT | NaT | NaT | NaT | NaT | NaT | NaT | nan | nan | ||
| 2 | 3 | nan | nan | nan | nan | nan | NaT | NaT | NaT | NaT | NaT | NaT | NaT | NaT | NaT | nan | nan |
t.toDF().dtypes
cbool bool
cchar int64
cshort int64
cint int64
clong int64
cdate datetime64[ns]
cmonth datetime64[ns]
ctime datetime64[ns]
cminute datetime64[ns]
csecond datetime64[ns]
cdatetime datetime64[ns]
ctimestamp datetime64[ns]
cnanotime datetime64[ns]
cnanotimestamp datetime64[ns]
cfloat float64
cdouble float64
csymbol object
cstring object
dtype: object
To create a partitioned database, use session method database.
s.database('db', partitionType=ddb.VALUE, partitions=["AMZN","NFLX", "NVDA"], dbPath=WORK_DIR+"/valuedb")
To delete a database, use session method dropDatabase. The following statement will drop a database if it exists.
if s.existsDatabase(WORK_DIR+"/valuedb"):
s.dropDatabase(WORK_DIR+"/valuedb")
To drop a DFS database partition, use session method dropPartition.
if s.existsDatabase("dfs://valuedb"):
s.dropDatabase("dfs://valuedb")
s.database('db', partitionType=ddb.VALUE, partitions=["AMZN","NFLX", "NVDA"], dbPath="dfs://valuedb")
trade=s.loadTextEx(dbPath="dfs://valuedb", partitionColumns=["TICKER"], tableName='trade', filePath=WORK_DIR + "/example.csv")
print(trade.rows)
# output
13136
s.dropPartition("dfs://valuedb", partitionPaths=["/AMZN", "/NFLX"])
trade = s.loadTable(tableName="trade", dbPath="dfs://valuedb")
print(trade.rows)
# output
4516
print(trade.select("distinct TICKER").toDF())
distinct_TICKER
0 NVDA
Please see section 3.1.
The following example appends to a partitioned table on disk. To use the appended table, we need to reload the table after appending.
trade = s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb")
print(trade.rows)
# output
13136
# take the top 10 rows of table "trade" on the DolphinDB server
t = trade.top(10).executeAs("top10")
trade.append(t)
# table "trade" needs to be reloaded in order to see the appended records
trade = s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb")
print (trade.rows)
# output
13146
The following example appends to an in-memory table.
trade=s.loadText(WORK_DIR+"/example.csv")
t = trade.top(10).executeAs("top10")
t1=trade.append(t)
print(t1.rows)
# output
13146
Function update can only be used on in-memory tables and must be followed by function execute.
trade = s.loadTable(tableName="trade", dbPath=WORK_DIR+"/valuedb", memoryMode=True)
trade = trade.update(["VOL"],["999999"]).where("TICKER=`AMZN").where(["date=2015.12.16"]).execute()
t1=trade.where("ticker=`AMZN").where("VOL=999999")
print(t1.toDF())
# output
TICKER date VOL PRC BID ASK
0 AMZN 1997.05.15 999999 23.50000 23.50000 23.62500
1 AMZN 1997.05.16 999999 20.75000 20.50000 21.00000
2 AMZN 1997.05.19 999999 20.50000 20.50000 20.62500
3 AMZN 1997.05.20 999999 19.62500 19.62500 19.75000
4 AMZN 1997.05.21 999999 17.12500 17.12500 17.25000
...
4948 AMZN 1997.05.27 999999 19.00000 19.00000 19.12500
4949 AMZN 1997.05.28 999999 18.37500 18.37500 18.62500
4950 AMZN 1997.05.29 999999 18.06250 18.00000 18.12500
[4951 rows x 6 columns]
Function delete must be followed by function execute to delete records from a table.
trade = s.loadTable(tableName="trade", dbPath=WORK_DIR+"/valuedb", memoryMode=True)
trade.delete().where('date<2013.01.01').execute()
print(trade.rows)
# output
3024
trade = s.loadTable(tableName="trade", dbPath=WORK_DIR + "/valuedb", memoryMode=True)
t1=trade.drop(['ask', 'bid'])
print(t1.top(5).toDF())
# output
TICKER date VOL PRC
0 AMZN 1997.05.15 6029815 23.500
1 AMZN 1997.05.16 1232226 20.750
2 AMZN 1997.05.19 512070 20.500
3 AMZN 1997.05.20 456357 19.625
4 AMZN 1997.05.21 1577414 17.125
s.dropTable(WORK_DIR + "/valuedb", "trade")
DolphinDB's table class supports method chaining to generate SQL statements.
trade = s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb", memoryMode=True)
print(trade.select(['ticker','date','bid','ask','prc','vol']).toDF())
# output
TICKER date VOL PRC BID ASK
0 AMZN 1997.05.15 6029815 23.500 23.500 23.625
1 AMZN 1997.05.16 1232226 20.750 20.500 21.000
2 AMZN 1997.05.19 512070 20.500 20.500 20.625
3 AMZN 1997.05.20 456357 19.625 19.625 19.750
4 AMZN 1997.05.21 1577414 17.125 17.125 17.250
...
We can use the showSQL method to display the SQL statement.
print(trade.select(['ticker','date','bid','ask','prc','vol']).where("date=2012.09.06").where("vol<10000000").showSQL())
# output
select ticker,date,bid,ask,prc,vol from T64afd5a6 where date=2012.09.06 and vol<10000000
print(trade.select("ticker,date,bid,ask,prc,vol").where("date=2012.09.06").where("vol<10000000").toDF())
# output
ticker date bid ask prc vol
0 AMZN 2012-09-06 251.42999 251.56 251.38 5657816
1 NFLX 2012-09-06 56.65000 56.66 56.65 5368963
...
Get the top records in a table.
trade = s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb")
trade.top(5).toDF()
# output
TICKER date VOL PRC BID ASK
0 AMZN 1997.05.16 6029815 23.50000 23.50000 23.6250
1 AMZN 1997.05.17 1232226 20.75000 20.50000 21.0000
2 AMZN 1997.05.20 512070 20.50000 20.50000 20.6250
3 AMZN 1997.05.21 456357 19.62500 19.62500 19.7500
4 AMZN 1997.05.22 1577414 17.12500 17.12500 17.2500
We can use where method to filter the selection.
We can use method chaining to apply multiple conditions.
trade = s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb", memoryMode=True)
# use chaining WHERE conditions and save result to DolphinDB server variable "t1" through function "executeAs"
t1=trade.select(['date','bid','ask','prc','vol']).where('TICKER=`AMZN').where('bid!=NULL').where('ask!=NULL').where('vol>10000000').sort('vol desc').executeAs("t1")
print(t1.toDF())
# output
date bid ask prc vol
0 2007.04.25 56.80 56.8100 56.810 104463043
1 1999.09.29 80.75 80.8125 80.750 80380734
2 2006.07.26 26.17 26.1800 26.260 76996899
3 2007.04.26 62.77 62.8300 62.781 62451660
4 2005.02.03 35.74 35.7300 35.750 60580703
...
print(t1.rows)
765
We can use the showSQL method to display the SQL statement.
print(trade.select(['date','bid','ask','prc','vol']).where('TICKER=`AMZN').where('bid!=NULL').where('ask!=NULL').where('vol>10000000').sort('vol desc').showSQL())
# output
select date,bid,ask,prc,vol from Tff260d29 where TICKER=`AMZN and bid!=NULL and ask!=NULL and vol>10000000 order by vol desc
We can pass a list of field names as a string to select method and conditions as string to where method.
trade = s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb")
print(trade.select("ticker, date, vol").where("bid!=NULL, ask!=NULL, vol>50000000").toDF())
# output
ticker date vol
0 AMZN 1999.09.29 80380734
1 AMZN 2000.06.23 52221978
2 AMZN 2001.11.26 51543686
3 AMZN 2002.01.22 57235489
4 AMZN 2005.02.03 60580703
...
38 NVDA 2016.11.11 54384267
39 NVDA 2016.12.28 57384116
40 NVDA 2016.12.29 54384676
Method groupby must be followed by an aggregate function such as count, sum, avg, std, etc.
trade = s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb")
print(trade.select('ticker').groupby(['ticker']).count().sort(bys=['ticker desc']).toDF())
# output
ticker count_ticker
0 NVDA 4516
1 NFLX 3679
2 AMZN 4941
Calculate the sum of column "vol" and the sum of column "prc" in each "ticker" group:
trade = s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb")
print(trade.select(['vol','prc']).groupby(['ticker']).sum().toDF())
# output
ticker sum_vol sum_prc
0 AMZN 33706396492 772503.81377
1 NFLX 14928048887 421568.81674
2 NVDA 46879603806 127139.51092
groupby can be used with having:
trade = s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb")
print(trade.select('count(ask)').groupby(['vol']).having('count(ask)>1').toDF())
# output
vol count_ask
0 579392 2
1 3683504 2
2 5732076 2
3 6299736 2
4 6438038 2
5 6946976 2
6 8160197 2
7 8924303 2
...
contextby is similar to groupby except that for each group, groupby returns a scalar whereas contextby returns a vector of the same size as the group.
df= s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb").contextby('ticker').top(3).toDF()
print(df)
# output
TICKER date VOL PRC BID ASK
0 AMZN 1997-05-15 6029815 23.5000 23.5000 23.6250
1 AMZN 1997-05-16 1232226 20.7500 20.5000 21.0000
2 AMZN 1997-05-19 512070 20.5000 20.5000 20.6250
3 NFLX 2002-05-23 7507079 16.7500 16.7500 16.8500
4 NFLX 2002-05-24 797783 16.9400 16.9400 16.9500
5 NFLX 2002-05-28 474866 16.2000 16.2000 16.3700
6 NVDA 1999-01-22 5702636 19.6875 19.6250 19.6875
7 NVDA 1999-01-25 1074571 21.7500 21.7500 21.8750
8 NVDA 1999-01-26 719199 20.0625 20.0625 20.1250
df= s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb").select("TICKER, month(date) as month, cumsum(VOL)").contextBy("TICKER,month(date)").toDF()
print(df)
# output
TICKER month cumsum_VOL
0 AMZN 1997.05M 6029815
1 AMZN 1997.05M 7262041
2 AMZN 1997.05M 7774111
3 AMZN 1997.05M 8230468
4 AMZN 1997.05M 9807882
...
13133 NVDA 2016.12M 367356016
13134 NVDA 2016.12M 421740692
13135 NVDA 2016.12M 452063951
df= s.loadTable(tableName="trade",dbPath=WORK_DIR+"/valuedb").select("TICKER, month(date) as month, sum(VOL").contextBy("TICKER,month(date)").toDF()
print(df)
# output
TICKER month sum_VOL
0 AMZN 1997.05M 13736587
1 AMZN 1997.05M 13736587
2 AMZN 1997.05M 13736587
3 AMZN 1997.05M 13736587
4 AMZN 1997.05M 13736587
5 AMZN 1997.05M 13736587
...
13133 NVDA 2016.12M 452063951
13134 NVDA 2016.12M 452063951
13135 NVDA 2016.12M 452063951
df= s.loadTable(dbPath=WORK_DIR+"/valuedb", tableName="trade").contextby('ticker').having("sum(VOL)>40000000000").toDF()
print(df)
# output
TICKER date VOL PRC BID ASK
0 NVDA 1999.01.22 5702636 19.6875 19.6250 19.6875
1 NVDA 1999.01.25 1074571 21.7500 21.7500 21.8750
2 NVDA 1999.01.26 719199 20.0625 20.0625 20.1250
3 NVDA 1999.01.27 510637 20.0000 19.8750 20.0000
4 NVDA 1999.01.28 476094 19.9375 19.8750 20.0000
5 NVDA 1999.01.29 509718 19.0000 19.0000 19.3125
...
4512 NVDA 2016.12.27 29857132 117.3200 117.3100 117.3200
4513 NVDA 2016.12.28 57384116 109.2500 109.2500 109.2900
4514 NVDA 2016.12.29 54384676 111.4300 111.2600 111.4200
4515 NVDA 2016.12.30 30323259 106.7400 106.7300 106.7500
DolphinDB table class has method merge for inner, left, and outer join; method merge_asof for asof join; method merge_window for window join.
Specify joining columns with parameter "on" if joining column names are identical in both tables; use parameters "left_on" and "right_on" when joining column names are different. The optional parameter "how" indicates table join type. The default table join mode is inner join.
trade = s.loadTable(dbPath=WORK_DIR+"/valuedb", tableName="trade")
t1 = s.table(data={'TICKER': ['AMZN', 'AMZN', 'AMZN'], 'date': ['2015.12.31', '2015.12.30', '2015.12.29'], 'open': [695, 685, 674]})
print(trade.merge(t1,on=["TICKER","date"]).toDF())
# output
TICKER date VOL PRC BID ASK open
0 AMZN 2015.12.29 5734996 693.96997 693.96997 694.20001 674
1 AMZN 2015.12.30 3519303 689.07001 689.07001 689.09998 685
2 AMZN 2015.12.31 3749860 675.89001 675.85999 675.94000 695
We need to specify arguments "left_on" and "right_on" when joining column names are different.
trade = s.loadTable(dbPath=WORK_DIR+"/valuedb", tableName="trade")
t1 = s.table(data={'TICKER1': ['AMZN', 'AMZN', 'AMZN'], 'date1': ['2015.12.31', '2015.12.30', '2015.12.29'], 'open': [695, 685, 674]})
print(trade.merge(t1,left_on=["TICKER","date"], right_on=["TICKER1","date1"]).toDF())
# output
TICKER date VOL PRC BID ASK open
0 AMZN 2015.12.29 5734996 693.96997 693.96997 694.20001 674
1 AMZN 2015.12.30 3519303 689.07001 689.07001 689.09998 685
2 AMZN 2015.12.31 3749860 675.89001 675.85999 675.94000 695
To conduct left join, set how="left".
trade = s.loadTable(dbPath=WORK_DIR+"/valuedb", tableName="trade")
t1 = s.table(data={'TICKER': ['AMZN', 'AMZN', 'AMZN'], 'date': ['2015.12.31', '2015.12.30', '2015.12.29'], 'open': [695, 685, 674]})
print(trade.merge(t1,how="left", on=["TICKER","date"]).where('TICKER=`AMZN').where('2015.12.23<=date<=2015.12.31').toDF())
# output
TICKER date VOL PRC BID ASK open
0 AMZN 2015-12-23 2722922 663.70001 663.48999 663.71002 NaN
1 AMZN 2015-12-24 1092980 662.78998 662.56000 662.79999 NaN
2 AMZN 2015-12-28 3783555 675.20001 675.00000 675.21002 NaN
3 AMZN 2015-12-29 5734996 693.96997 693.96997 694.20001 674.0
4 AMZN 2015-12-30 3519303 689.07001 689.07001 689.09998 685.0
5 AMZN 2015-12-31 3749860 675.89001 675.85999 675.94000 695.0
To conduct outer join, set how="outer". A partitioned table can only be outer joined with a partitioned table, and an in-memory table can only be outer joined with an in-memory table.
t1 = s.table(data={'TICKER': ['AMZN', 'AMZN', 'NFLX'], 'date': ['2015.12.29', '2015.12.30', '2015.12.31'], 'open': [674, 685, 942]})
t2 = s.table(data={'TICKER': ['AMZN', 'NFLX', 'NFLX'], 'date': ['2015.12.29', '2015.12.30', '2015.12.31'], 'close': [690, 936, 951]})
print(t1.merge(t2, how="outer", on=["TICKER","date"]).toDF())
# output
TICKER date open TMP_TBL_ec03c3d2_TICKER TMP_TBL_ec03c3d2_date \
0 AMZN 2015.12.29 674.0 AMZN 2015.12.29
1 AMZN 2015.12.30 685.0
2 NFLX 2015.12.31 942.0 NFLX 2015.12.31
3 NaN NFLX 2015.12.30
close
0 690.0
1 NaN
2 951.0
3 936.0
The asof join function is a type of non-synchronous join. It is similar to the left join function witht the following differences:
- The data type of the last matching column is usually temporal. For a row in the left table with time t, if there is not a match of left join in the right table, the row in the right table that corresponds to the most recent time before time t is taken, if all the other matching columns are matched; if there are more than one matching record in the right table, the last record is taken.
- If there is only 1 joining column, the asof join function assumes the right table is sorted on the joining column. If there are multiple joining columns, the asof join function assumes the right table is sorted on the last joining column within each group defined by the other joining columns. The right table does not need to be sorted by the other joining columns. If these conditions are not met, we may see unexpected results. The left table does not need to be sorted.
For the examples in this and the next section, we use trades.csv and quotes.csv which have AAPL and FB trades and quotes data on 10/24/2016 taken from NYSE website.
WORK_DIR = "C:/DolphinDB/Data"
if s.existsDatabase(WORK_DIR+"/tickDB"):
s.dropDatabase(WORK_DIR+"/tickDB")
s.database('db', partitionType=ddb.VALUE, partitions=["AAPL","FB"], dbPath=WORK_DIR+"/tickDB")
trades = s.loadTextEx("db", tableName='trades',partitionColumns=["Symbol"], filePath=WORK_DIR + "/trades.csv")
quotes = s.loadTextEx("db", tableName='quotes',partitionColumns=["Symbol"], filePath=WORK_DIR + "/quotes.csv")
print(trades.top(5).toDF())
# output
Time Symbol Trade_Volume Trade_Price
0 09:30:00.087488712 AAPL 370466 117.100
1 09:30:00.087681843 AAPL 370466 117.100
2 09:30:00.103645440 AAPL 100 117.100
3 09:30:00.213850801 AAPL 20 117.100
4 09:30:00.264854448 AAPL 17 117.095
print(quotes.where("second(Time)>=09:29:59").top(5).toDF())
# output
Time Symbol Bid_Price Bid_Size Offer_Price Offer_Size
0 09:29:59.300399073 AAPL 117.07 1 117.09 1
1 09:29:59.300954263 AAPL 117.07 1 117.09 1
2 09:29:59.301594217 AAPL 117.05 1 117.19 10
3 09:30:00.499924044 AAPL 117.09 46 117.10 3
4 09:30:00.500005573 AAPL 116.86 53 117.37 64
print(trades.merge_asof(quotes,on=["Symbol","Time"]).select(["Symbol","Time","Trade_Volume","Trade_Price","Bid_Price", "Bid_Size","Offer_Price", "Offer_Size"]).top(5).toDF())
# output
Symbol Time Trade_Volume Trade_Price Bid_Price Bid_Size \
0 AAPL 09:30:00.087488712 370466 117.100 117.05 1
1 AAPL 09:30:00.087681843 370466 117.100 117.05 1
2 AAPL 09:30:00.103645440 100 117.100 117.05 1
3 AAPL 09:30:00.213850801 20 117.100 117.05 1
4 AAPL 09:30:00.264854448 17 117.095 117.05 1
Offer_Price Offer_Size
0 117.19 10
1 117.19 10
2 117.19 10
3 117.19 10
4 117.19 10
To calculate trading cost:
print(trades.merge_asof(quotes, on=["Symbol","Time"]).select("sum(Trade_Volume*abs(Trade_Price-(Bid_Price+Offer_Price)/2))/sum(Trade_Volume*Trade_Price)*10000 as cost").groupby("Symbol").toDF())
# output
Symbol cost
0 AAPL 0.899823
1 FB 2.722923
merge_window (window join) is a generalization of asof join. With a window defined by parameters "leftBound" (w1) and "rightBound" (w2), for each row in the left table with the value of the last joining column equal to t, find the rows in the right table with the value of the last joining column between (t+w1) and (t+w2) conditional on all other joining columns are matched, then apply "aggFunctions" to the selected rows in the right table.
The only difference between window join and prevailing window join is that if the right table doesn't contain a matching value for t+w1 (the left boundary of the window), prevailing window join will fill it with the last value before t+w1 (conditional on all other joining columns are matched), and apply "aggFunctions". To use prevailing window join, set prevailing=True.
print(trades.merge_window(quotes, -5000000000, 0, aggFunctions=["avg(Bid_Price)","avg(Offer_Price)"], on=["Symbol","Time"]).where("Time>=15:59:59").top(10).toDF())
# output
Time Symbol Trade_Volume Trade_Price avg_Bid_Price \
0 15:59:59.003095025 AAPL 250 117.620 117.603714
1 15:59:59.003748103 AAPL 100 117.620 117.603714
2 15:59:59.011092788 AAPL 95 117.620 117.603714
3 15:59:59.011336471 AAPL 200 117.620 117.603714
4 15:59:59.022841207 AAPL 144 117.610 117.603689
5 15:59:59.028169703 AAPL 130 117.615 117.603544
6 15:59:59.035357411 AAPL 1101 117.610 117.603544
7 15:59:59.035360176 AAPL 799 117.610 117.603544
8 15:59:59.035602676 AAPL 130 117.610 117.603544
9 15:59:59.036929307 AAPL 2201 117.610 117.603544
avg_Offer_Price
0 117.626816
1 117.626816
2 117.626816
3 117.626816
4 117.626803
5 117.626962
6 117.626962
7 117.626962
8 117.626962
9 117.626962
...
To calculate trading cost:
trades.merge_window(quotes,-1000000000, 0, aggFunctions="[wavg(Offer_Price, Offer_Size) as Offer_Price, wavg(Bid_Price, Bid_Size) as Bid_Price]", on=["Symbol","Time"], prevailing=True).select("sum(Trade_Volume*abs(Trade_Price-(Bid_Price+Offer_Price)/2))/sum(Trade_Volume*Trade_Price)*10000 as cost").groupby("Symbol").executeAs("tradingCost")
print(s.loadTable(tableName="tradingCost").toDF())
# output
Symbol cost
0 AAPL 0.953315
1 FB 1.077876
Function executeAs saves query result as a table on DolphinDB server.
trade = s.loadTable(dbPath=WORK_DIR+"/valuedb", tableName="trade")
trade.select(['date','bid','ask','prc','vol']).where('TICKER=`AMZN').where('bid!=NULL').where('ask!=NULL').where('vol>10000000').sort('vol desc').executeAs("AMZN")
To use the table "AMZN" on DolphinDB server:
t1=s.loadTable(tableName="AMZN")
Method ols conducts ordinary least squares regression. It returns a dictionary with regression results.
trade = s.loadTable(tableName="trade",dbPath=WORK_DIR + "/valuedb", memoryMode=True)
z=trade.select(['bid','ask','prc']).ols('PRC', ['BID', 'ASK'])
print(z["ANOVA"])
# output
Breakdown DF SS MS F Significance
0 Regression 2 2.689281e+08 1.344640e+08 1.214740e+10 0.0
1 Residual 13133 1.453740e+02 1.106937e-02 NaN NaN
2 Total 13135 2.689282e+08 NaN NaN NaN
print(z["RegressionStat"])
# output
item statistics
0 R2 0.999999
1 AdjustedR2 0.999999
2 StdError 0.105211
3 Observations 13136.000000
print(z["Coefficient"])
# output
factor beta stdError tstat pvalue
0 intercept 0.003710 0.001155 3.213150 0.001316
1 BID 0.605307 0.010517 57.552527 0.000000
2 ASK 0.394712 0.010515 37.537919 0.000000
print(z["Coefficient"].beta[1])
# output
0.6053065019659698
The following example conducts regression on a partitioned database. Note that the ratio operator between 2 integer columns in DolphinDB is "", which happens to be the escape character in Python, so we need to use "VOL\SHROUT" in function select.
result = s.loadTable(tableName="US",dbPath="dfs://US").select("select VOL\\SHROUT as turnover, abs(RET) as absRet, (ASK-BID)/(BID+ASK)*2 as spread, log(SHROUT*(BID+ASK)/2) as logMV").where("VOL>0").ols("turnover", ["absRet","logMV", "spread"], True)
print(result["ANOVA"])
# output
Breakdown DF SS MS F Significance
0 Regression 3 2.814908e+09 9.383025e+08 30884.26453 0.0
1 Residual 46701483 1.418849e+12 3.038125e+04 NaN NaN
2 Total 46701486 1.421674e+12 NaN NaN NaN
A session object has a method run that can execute any DolphinDB script. If the script returns an object in DolphinDB, method run converts the object to a corresponding object in Python.
# Load table
trade = s.loadTable(dbPath=WORK_DIR+"/valuedb", tableName="trade")
# query the table and return a pandas DataFrame
t = s.run("select bid, ask, prc from trade where bid!=NULL, ask!=NULL, vol>1000")
print(t)
In this section we give an example of a backtest on a stock momentum strategy. The momentum strategy is one of the best-known quantitative long short equity strategies. It has been studied in numerous academic and sell-side publications since Jegadeesh and Titman (1993). Investors in the momentum strategy believe among individual stocks, past winners will outperform past losers. The most commonly used momentum factor is stocks' past 12 months returns skipping the most recent month. In academic research, the momentum strategy is usually rebalanced once a month and the holding period is also one month. In this example, we rebalance 1/5 of our portfolio positions every day and hold the new tranche for 5 days. For simplicity, transaction costs are not considered.
Create server session
import dolphindb as ddb
s=ddb.session()
s.connect("localhost",8921, "admin", "123456")
Step 1: Load data, clean the data, and construct the momentum signal (past 12 months return skipping the most recent month) for each firm. Undefine the table "USstocks" to release the large amount of memory it occupies. Note that executeAs must be used to save the intermediate results on DolphinDB server. Dataset "US" contains US stock price data from 1990 to 2016.
US = s.loadTable(dbPath="dfs://US", tableName="US")
def loadPriceData(inData):
s.loadTable(inData).select("PERMNO, date, abs(PRC) as PRC, VOL, RET, SHROUT*abs(PRC) as MV").where("weekday(date) between 1:5, isValid(PRC), isValid(VOL)").sort(bys=["PERMNO","date"]).executeAs("USstocks")
s.loadTable("USstocks").select("PERMNO, date, PRC, VOL, RET, MV, cumprod(1+RET) as cumretIndex").contextby("PERMNO").executeAs("USstocks")
return s.loadTable("USstocks").select("PERMNO, date, PRC, VOL, RET, MV, move(cumretIndex,21)/move(cumretIndex,252)-1 as signal").contextby("PERMNO").executeAs("priceData")
priceData = loadPriceData(US.tableName())
# US.tableName() returns the name of the table on the DolphinDB server that corresponds to the table object "US" in Python.
Step 2: Generate the portfolios for the momentum strategy.
def genTradeTables(inData):
return s.loadTable(inData).select(["date", "PERMNO", "MV", "signal"]).where("PRC>5, MV>100000, VOL>0, isValid(signal)").sort(bys=["date"]).executeAs("tradables")
def formPortfolio(startDate, endDate, tradables, holdingDays, groups, WtScheme):
holdingDays = str(holdingDays)
groups=str(groups)
ports = tradables.select("date, PERMNO, MV, rank(signal,,"+groups+") as rank, count(PERMNO) as symCount, 0.0 as wt").where("date between "+startDate+":"+endDate).contextby("date").having("count(PERMNO)>=100").executeAs("ports")
if WtScheme == 1:
ports.where("rank=0").contextby("date").update(cols=["wt"], vals=["-1.0/count(PERMNO)/"+holdingDays]).execute()
ports.where("rank="+groups+"-1").contextby("date").update(cols=["wt"], vals=["1.0/count(PERMNO)/"+holdingDays]).execute()
elif WtScheme == 2:
ports.where("rank=0").contextby("date").update(cols=["wt"], vals=["-MV/sum(MV)/"+holdingDays]).execute()
ports.where("rank="+groups+"-1").contextby("date").update(cols=["wt"], vals=["MV/sum(MV)/"+holdingDays]).execute()
else:
raise Exception("Invalid WtScheme. valid values:1 or 2")
return ports.select("PERMNO, date as tranche, wt").where("wt!=0").sort(bys=["PERMNO","date"]).executeAs("ports")
tradables=genTradeTables(priceData.tableName())
startDate="1996.01.01"
endDate="2017.01.01"
holdingDays=5
groups=10
ports=formPortfolio(startDate=startDate,endDate=endDate,tradables=tradables,holdingDays=holdingDays,groups=groups,WtScheme=2)
dailyRtn=priceData.select("date, PERMNO, RET as dailyRet").where("date between "+startDate+":"+endDate).executeAs("dailyRtn")
Step 3: Calculate the profit/loss for each stock in the portfolio in each of the days in the holding period. Close the positions at the end of the holding period.
def calcStockPnL(ports, dailyRtn, holdingDays, endDate):
s.table(data={'age': list(range(1,holdingDays+1))}).executeAs("ages")
ports.select("tranche").sort("tranche").executeAs("dates")
s.run("dates = sort distinct dates.tranche")
s.run("dictDateIndex=dict(dates,1..dates.size())")
s.run("dictIndexDate=dict(1..dates.size(), dates)")
ports.merge_cross(s.table(data="ages")).select("dictIndexDate[dictDateIndex[tranche]+age] as date, PERMNO, tranche, age, take(0.0,age.size()) as ret, wt as expr, take(0.0,age.size()) as pnl").where("isValid(dictIndexDate[dictDateIndex[tranche]+age]), dictIndexDate[dictDateIndex[tranche]+age]<=min(lastDays[PERMNO], "+endDate+")").executeAs("pos")
t1= s.loadTable("pos")
t1.merge(dailyRtn, on=["date","PERMNO"], merge_for_update=True).update(["ret"],["dailyRet"]).execute()
t1.contextby(["PERMNO","tranche"]).update(["expr"], ["expr*cumprod(1+ret)"]).execute()
t1.update(["pnl"],["expr*ret/(1+ret)"]).execute()
return t1
lastDaysTable = priceData.select("max(date) as date").groupby("PERMNO").executeAs("lastDaysTable")
s.run("lastDays=dict(lastDaysTable.PERMNO,lastDaysTable.date)")
# undefine priceData to release memory
s.undef(priceData.tableName(), 'VAR')
stockPnL = calcStockPnL(ports=ports, dailyRtn=dailyRtn, holdingDays=holdingDays, endDate=endDate)
Step 4: Calculate portfolio profit/loss
portPnl = stockPnL.select("pnl").groupby("date").sum().sort(bys=["date"]).executeAs("portPnl")
print(portPnl.toDF())
The example below shows how to calculate factor No. 98 in "101 Formulaic Alphas" by Kakushadze (2015) with daily data of US stocks.
def alpha98(t):
t1 = s.table(data=t)
# add two calcualted columns through function update
t1.contextby(["date"]).update(cols=["rank_open","rank_adv15"], vals=["rank(open)","rank(adv15)"]).execute()
# add two more calculated columns
t1.contextby(["PERMNO"]).update(["decay7", "decay8"], ["mavg(mcorr(vwap, msum(adv5, 26), 5), 1..7)","mavg(mrank(9 - mimin(mcorr(rank_open, rank_adv15, 21), 9), true, 7), 1..8)"]).execute()
# return the final results with three columns: PERMNO, date, and A98
return t1.select("PERMNO, date, rank(decay7)-rank(decay8) as A98").contextby(["date"]).executeAs("alpha98")
US = s.loadTable(tableName="US", dbPath="dfs://US").select("PERMNO, date, PRC as vwap, PRC+rand(1.0, PRC.size()) as open, mavg(VOL, 5) as adv5, mavg(VOL,15) as adv15").where("2007.01.01<=date<=2016.12.31").contextby("PERMNO").executeAs("US")
result=alpha98(US.tableName()).where('date>2007.03.12').executeAs("result")
print(result.top(10).toDF())