DolphinDB Python API supports Python 3.6 - 3.8.
Please install DolphinDB Python API with the following command:
$ pip install dolphindb- 1. Execute DolphinDB script and call DolphinDB functions
- 2. Upload Python objects to DolphinDB server
- 3. Create DolphinDB databases and tables
- 4. Import data to DolphinDB Database
- 5. Load data from DolphinDB database
- 6. Append to DolphinDB tables
- 7. Database and Table Operations
- 8. SQL queries
- 9. More Examples
DolphinDB Python API in essense encapsulates a subset of DolphinDB's scripting language. It converts Python script to DolphinDB script to be executed on the DolphinDB server. The result can either be saved on DolphinDB server or serialized to a Python client object.
The examples in this tutorial use a csv file: example.csv.
Python interacts with DolphinDB through a Session object. The most commonly used Session class methods are as follows:
| Method | Explanation |
|---|---|
| connect(host,port,[username,password]) | Connect a session to DolphinDB server |
| login(username,password,[enableEncryption]) | log in DolphinDB server |
| run(script) | Execute script on DolphinDB server |
| run(functionName,args) | Call functions on DolphinDB server |
| upload(DictionaryOfPythonObjects) | Upload Python objects to DolphinDB server |
| undef(objName,objType) | Undefine an object in DolphinDB to release memory |
| undefAll() | Undefine all objects in DolphinDB to release memory |
| close() | Close the session |
In the following script, we first create a session in Python, then connect the session to a DolphinDB server with specified domain name/IP address and port number. Please note that We need to start a DolphinDB server before running the following Python script.
import dolphindb as ddb
s = ddb.session()
s.connect("localhost", 8848)
# output
TrueUse the following script to connect to DolphinDB server with username and password:
s.connect("localhost", 8848, YOUR_USER_NAME, YOUR_PASS_WORD)or
s.connect("localhost", 8848)
s.login(YOUR_USER_NAME,YOUR_PASS_WORD)If a session was initialized without username and password, we can use the method login to log in DolphinDB server. The default username is 'admin' and the default password is '123456'. By default, the user name and password are encrypted during connection.
Since server version 1.10.17 and 1.20.6, we can add the parameter 'enableSSL' when creating a session. The default value is False.
Use the following script to enable SSL. Please also add the configuration parameter enableHTTPS=true at the server.
s=ddb.session(enableSSL=True)
Since server version 1.10.17 and 1.20.6, we can add the parameter 'enableASYN' when creating a session. The default value is False.
Use the following script to enable asynchronous communication. With asynchronous communication, communication with the server can only use the session.run method and no values are returned. This mode is ideal for writing data asynchronously.
s=ddb.session(enableASYN=True)
All DolphinDB script can be executed through the run(script) method. If the script returns an object in DolphinDB, it will be converted to an object in Python. If the script fails to run, there will be a corresponding error prompt.
s = ddb.session()
s.connect("localhost", 8848)
a=s.run("`IBM`GOOG`YHOO")
repr(a)
# output
"array(['IBM', 'GOOG', 'YHOO'], dtype='<U4')"User-defined functions can be generated with the run method:
s.run("def getTypeStr(input){ \nreturn typestr(input)\n}")For multiple lines of script, we can wrap them inside triple quotes for clarity. For example:
script="""
def getTypeStr(input){
return typestr(input)
}
"""
s.run(script)
s.run("getTypeStr", 1);
# output
'LONG'** Note: ** The maximum length of the text in the run method is 65,535 bytes.
In addition to executing script, the run method can directly call DolphinDB built-in or user-defined functions on a remote DolphinDB server. For this usage, the first parameter of the run method is the function name and the subsequent parameters are the parameters of the function.
The following example shows a Python program calling DolphinDB built-in function add through method run. The add function has 2 parameters: x and y. Depending on whether the values of the parameters have been assigned at DolphinDB server, there are 3 ways to call the function:
- Both parameters have been assigned value at DolphinDB server
If both x and y have been assigned value at DolphinDB server in the Python program,
s.run("x = [1,3,5];y = [2,4,6]")then just use run(script):
a=s.run("add(x,y)")
repr(a)
# output
'array([3, 7, 11], dtype=int32)'- Only one parameter has been assigned value at DolphinDB server
If only x has been assigned value at DolphinDB server in the Python program
s.run("x = [1,3,5]")and y is to be assigned value when calling add, we need to use Partial Application to fix parameter x to function add.
import numpy as np
y=np.array([1,2,3])
result=s.run("add{x,}", y)
repr(result)
# output
'array([2,5,8])'
result.dtype
# output
dtype('int64')- Both parameters are to be assigned value
import numpy as np
x=np.array([1.5,2.5,7])
y=np.array([8.5,7.5,3])
result=s.run("add", x, y)
repr(result)
# output
'array([10., 10., 10.])'
result.dtype
# output
dtype('float64')When calling DolphinDB's built-in functions through run, the parameters uploaded can be scalar, list, dict, numpy objects, pandas DataFrame and Series, etc.
Note:
- NumPy arrays can only be 1D or 2D.
- If a pandas DataFrame or Series object has an index, the index will be lost after the object is uploaded to DolphinDB. To keep the index, use the pandas DataFrame function
reset_index.- If a parameter of a DolphinDB function is of temporal type, it should be converted to numpy.datetime64 type before uploading.
The following examples explain the use of various types of Python objects as parameters.
-
list objects
Add 2 Python lists with DolphinDB function
add:s.run("add",[1,2,3,4],[1,2,1,1]) # output array([2, 4, 4, 5])
-
NumPy objects
-
np.int
import numpy as np s.run("add{1,}",np.int(4)) # output 5
-
np.datetime64
np.datetime64 is converted into corresponding DolphinDB temporal type.
datetime64 DolphinDB Type '2019-01-01' DATE '2019-01' MONTH '2019-01-01T20:01:01' DATETIME '2019-01-01T20:01:01.122' TIMESTAMP '2019-01-01T20:01:01.122346100' NANOTIMESTAMP import numpy as np s.run("typestr",np.datetime64('2019-01-01')) # output 'DATE' s.run("typestr",np.datetime64('2019-01')) # output 'MONTH' s.run("typestr",np.datetime64('2019-01-01T20:01:01')) # output 'DATETIME' s.run("typestr",np.datetime64('2019-01-01T20:01:01.122')) # output 'TIMESTAMP' s.run("typestr",np.datetime64('2019-01-01T20:01:01.1223461')) # output 'NANOTIMESTAMP'
As TIME, MINUTE, SECOND and NANOTIME types in DolphinDB don't have information about dates, datetime64 type cannot be converted into these types directly in Python API. To generate these data types in DolphinDB from Python, we can upload the datetime64 type to DolphinDB server and then get rid of the date information.
import numpy as np ts = np.datetime64('2019-01-01T20:01:01.1223461') s.upload({'ts':ts}) s.run('a=nanotime(ts)') s.run('typestr(a)') # output 'NANOTIME' s.run('a') # output numpy.datetime64('1970-01-01T20:01:01.122346100')
Please note that in the last step of the example above, when the NANOTIME type in DolphinDB is downloaded to Python, Python automatically adds 1970-01-01 as the date part.
-
list of np.datetime64 objects
import numpy as np a=[np.datetime64('2019-01-01T20:00:00.000000001'), np.datetime64('2019-01-01T20:00:00.000000001')] s.run("add{1,}",a) # output array(['2019-01-01T20:00:00.000000002', '2019-01-01T20:00:00.000000002'], dtype='datetime64[ns]')
-
-
pandas objects
If a pandas DataFrame or Series object has an index, the index will be lost after the object is uploaded to DolphinDB.
-
Series
import pandas as pd import numpy as np a = pd.Series([1,2,3,1,5],index=np.arange(1,6,1)) s.run("add{1,}",a) # output array([2, 3, 4, 2, 6])
-
DataFrame
import pandas as pd import numpy as np a = pd.DataFrame({'id': np.int32([1, 4, 3, 2, 3]), 'date': np.array(['2019-02-03','2019-02-04','2019-02-05','2019-02-06','2019-02-07'], dtype='datetime64[D]'), 'value': np.double([7.8, 4.6, 5.1, 9.6, 0.1]),}, index=['one', 'two', 'three', 'four', 'five']) s.upload({'a':a}) s.run("typestr",a) # output 'IN-MEMORY TABLE' s.run('a') # output id date value 0 1 2019-02-03 7.8 1 4 2019-02-04 4.6 2 3 2019-02-05 5.1 3 2 2019-02-06 9.6 4 3 2019-02-07 0.1
-
The session method undef releases specified objects in a session; method undefAll releases all objects in a session. undef can be used on the following objects: "VAR"(variable), "SHARED"(shared variable) and "DEF"(function definition). The default type is "VAR". "SHARED" refers to shares variables across sessions, such as a shared stream table.
Sometimes we would like to automatically release the variables created in a run statement after the execution is finished to reduce memory footprint. To do this, we can set the parameter clearMemory=True in Session or DBConnectionPool's run method. Please note that the default value of 'clearMemory' of Session's run method is False, whereas the default value of 'clearMemory' of DBConnectionPool's run method is True.
s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")
s.run("t = 1", clearMemory = True)
s.run("t") As the variable t is released after the execution of s.run("t = 1", clearMemory = True), the last statement will throw an exception:
<Exception> in run: Syntax Error: [line #1] Cannot recognize the token t
To use a Python object repeatedly in DolphinDB, we can upload the Python object to the DolphinDB server and specify the variable name in DolphinDB.
If a Python object is used only once at DolphinDB server, it is recommended to include it as a parameter in a function call instead of uploading it. Please refer to section 2.3 for details.
The Python API provides method upload to upload Python objects to the DolphinDB server. The input of the method upload is a Python dictionary object. The key of the dictionary is the variable name in DolphinDB and the value is a Python object, which can be Numbers, Strings, Lists, DataFrame, etc.
- Upload Python list
a = [1,2,3.0]
s.upload({'a':a})
a_new = s.run("a")
print(a_new)
# output
[1. 2. 3.]
a_type = s.run("typestr(a)")
print(a_type)
# output
ANY VECTORPlease note that a Python list with multiple data types such as a=[1,2,3.0] will be recognized as an ANY vector after being uploaded to DolphinDB. For such cases, it is recommended to use np.array instead of list. With np.array, we can specify a single data type through a=np.array([1,2,3.0],dtype=np.double) so that after uploading, "a" is a vector of DOUBLE type.
- Upload NumPy array
import numpy as np
arr = np.array([1,2,3.0],dtype=np.double)
s.upload({'arr':arr})
arr_new = s.run("arr")
print(arr_new)
# output
[1. 2. 3.]
arr_type = s.run("typestr(arr)")
print(arr_type)
# output
FAST DOUBLE VECTOR- Upload pandas DataFrame
import pandas as pd
import numpy as np
df = pd.DataFrame({'id': np.int32([1, 2, 3, 6, 8]), 'x': np.int32([5, 4, 3, 2, 1])})
s.upload({'t1': df})
print(s.run("t1.x.avg()"))
# output
3.0In Python, we can use the method table to create a DolphinDB table object and upload it to the server. The input of the method table can be a dictionary, DataFrame or table name in DolphinDB.
- Upload dict
The script below defines a function createDemoDict() to create a dictionary.
import numpy as np
def createDemoDict():
return {'id': [1, 2, 2, 3],
'date': np.array(['2021.05.06', '2021.05.07', '2021.05.06', '2021.05.07'], dtype='datetime64[D]'),
'ticker': ['AAPL', 'AAPL', 'AMZN', 'AMZN'],
'price': [129.74, 130.21, 3306.37, 3291.61]}Upload the dictionary to the DolphinDB server with the method table, and name the table as "testDict", then we can read the table with the method loadTable provided by the API.
import numpy as np
# save the table to DolphinDB server as table "testDict"
dt = s.table(data=createDemoDict(), tableAliasName="testDict")
# load table "testDict" on DolphinDB server
print(s.loadTable("testDict").toDF())
# output
date ticker price
0 2021-05-06 AAPL 129.74
1 2021-05-07 AAPL 130.21
2 2021-05-06 AMZN 3306.37
3 2021-05-07 AMZN 3291.61- Upload pandas DataFrame
The script below defines function createDemoDataFrame() to create a pandas DataFrame.
import pandas as pd
def createDemoDataFrame():
data = {'cid': np.array([1, 2, 3], dtype=np.int32),
'cbool': np.array([True, False, np.nan], dtype=np.bool),
'cchar': np.array([1, 2, 3], dtype=np.int8),
'cshort': np.array([1, 2, 3], dtype=np.int16),
'cint': np.array([1, 2, 3], dtype=np.int32),
'clong': np.array([0, 1, 2], dtype=np.int64),
'cdate': np.array(['2019-02-04', '2019-02-05', ''], dtype='datetime64[D]'),
'cmonth': np.array(['2019-01', '2019-02', ''], dtype='datetime64[M]'),
'ctime': np.array(['2019-01-01 15:00:00.706', '2019-01-01 15:30:00.706', ''], dtype='datetime64[ms]'),
'cminute': np.array(['2019-01-01 15:25', '2019-01-01 15:30', ''], dtype='datetime64[m]'),
'csecond': np.array(['2019-01-01 15:00:30', '2019-01-01 15:30:33', ''], dtype='datetime64[s]'),
'cdatetime': np.array(['2019-01-01 15:00:30', '2019-01-02 15:30:33', ''], dtype='datetime64[s]'),
'ctimestamp': np.array(['2019-01-01 15:00:00.706', '2019-01-01 15:30:00.706', ''], dtype='datetime64[ms]'),
'cnanotime': np.array(['2019-01-01 15:00:00.80706', '2019-01-01 15:30:00.80706', ''], dtype='datetime64[ns]'),
'cnanotimestamp': np.array(['2019-01-01 15:00:00.80706', '2019-01-01 15:30:00.80706', ''], dtype='datetime64[ns]'),
'cfloat': np.array([2.1, 2.658956, np.NaN], dtype=np.float32),
'cdouble': np.array([0., 47.456213, np.NaN], dtype=np.float64),
'csymbol': np.array(['A', 'B', '']),
'cstring': np.array(['abc', 'def', ''])}
return pd.DataFrame(data)Upload the DataFrame to DolphinDB server with method table, name it as "testDataFrame", then we can read the table with method loadTable provided by the API.
import pandas as pd
# save the table to DolphinDB server as table "testDataFrame"
dt = s.table(data=createDemoDataFrame(), tableAliasName="testDataFrame")
# load table "testDataFrame" on DolphinDB server
print(s.loadTable("testDataFrame").toDF())
# output
>>> print(s.loadTable("testDataFrame").toDF())
cid cbool cchar cshort cint ... cnanotimestamp cfloat cdouble csymbol cstring
0 1 True 1 1 1 ... 2019-01-01 15:00:00.807060 2.100000 0.000000 A abc
1 2 False 2 2 2 ... 2019-01-01 15:30:00.807060 2.658956 47.456213 B def
2 3 True 3 3 3 ... NaT NaN NaNFunctions table and loadTable return a local Python object. In the following example, table t1 at DolphinDB server corresponds to a local Python object t0:
t0=s.table(data=createDemoDict(), tableAliasName="t1")Use the following 3 ways to release the variable at DolphinDB server t1 at DolphinDB server:
undef
s.undef("t1", "VAR")- assign Null value to the variable at DolphinDB server
s.run("t1=NULL")- assign None to the local Python variable
t0=NoneAfter a variable is uploaded to DolphinDB server from Python with session.table function, the system creates a reference to the DolphinDB table for the Python variable. If the reference no longer exists, the DolphinDB table is automatically released.
The following script uploads a table to DolphinDB server and then downloads data with toDF().
t1=s.table(data=createDemoDict(), tableAliasName="t1")
print(t1.toDF())
#output
date ticker price
0 2021-05-06 AAPL 129.74
1 2021-05-07 AAPL 130.21
2 2021-05-06 AMZN 3306.37
3 2021-05-07 AMZN 3291.61In the same spirit, when loading a DFS table into memory with Python API, there is a correspondence between the local Python variable and the DolphinDB in-memory table.
Execute the following DolphinDB script:
db = database("dfs://testdb",RANGE, [1, 5 ,11])
t1=table(1..10 as id, 1..10 as v)
db.createPartitionedTable(t1,`t1,`id).append!(t1)
Then execute the following Python script:
pt1=s.loadTable(tableName='t1',dbPath="dfs://testdb")The script above creates a DFS table on DolphinDB server, then loads its metadata into memory with function loadTableand assigns it to the local Python object pt1. Please note t1 is the DFS table name instead of the DolphinDB table name corresponding to the local Python variable pt1. The corresponding DolphinDB table name can be obtained with pt1.tableName().
print(pt1.tableName())
'TMP_TBL_4c5647af'If a Python variable is used only once at DolphinDB server, it is recommended to include it as a parameter in a function call instead of uploading it. A function call does not cache data. After the function call is executed, all variables are released. Moreover, a function call is faster to execute as the network transmission only occurs once.
Use DolphinDB Python API methods or "run" method to create DolphinDB databases and tables in Python API.
import numpy as np
import pandas as pd
import dolphindb.settings as keysEach date is a partition:
dbPath="dfs://db_value_date"
if s.existsDatabase(dbPath):
s.dropDatabase(dbPath)
dates=np.array(pd.date_range(start='20120101', end='20120110'), dtype="datetime64[D]")
db = s.database(dbName='mydb', partitionType=keys.VALUE, partitions=dates,dbPath=dbPath)
df = pd.DataFrame({'datetime':np.array(['2012-01-01T00:00:00', '2012-01-02T00:00:00'], dtype='datetime64'), 'sym':['AA', 'BB'], 'val':[1,2]})
t = s.table(data=df)
db.createPartitionedTable(table=t, tableName='pt', partitionColumns='datetime').append(t)
re=s.loadTable(tableName='pt', dbPath=dbPath).toDF()Each month is a partition:
dbPath="dfs://db_value_month"
if s.existsDatabase(dbPath):
s.dropDatabase(dbPath)
months=np.array(pd.date_range(start='2012-01', end='2012-10', freq="M"), dtype="datetime64[M]")
db = s.database(dbName='mydb', partitionType=keys.VALUE, partitions=months,dbPath=dbPath)
df = pd.DataFrame({'date': np.array(['2012-01-01', '2012-02-01', '2012-05-01', '2012-06-01'], dtype="datetime64"), 'val':[1,2,3,4]})
t = s.table(data=df)
db.createPartitionedTable(table=t, tableName='pt', partitionColumns='date').append(t)
re=s.loadTable(tableName='pt', dbPath=dbPath).toDF()Partitions are based on id ranges:
dbPath="dfs://db_range_int"
if s.existsDatabase(dbPath):
s.dropDatabase(dbPath)
db = s.database(dbName='mydb', partitionType=keys.RANGE, partitions=[1, 11, 21], dbPath=dbPath)
df = pd.DataFrame({'id': np.arange(1, 21), 'val': np.repeat(1, 20)})
t = s.table(data=df, tableAliasName='t')
db.createPartitionedTable(table=t, tableName='pt', partitionColumns='id').append(t)
re = s.loadTable(tableName='pt', dbPath=dbPath).toDF()Partitions are based on lists of stock tickers:
dbPath="dfs://db_list_sym"
if s.existsDatabase(dbPath):
s.dropDatabase(dbPath)
db = s.database(dbName='mydb', partitionType=keys.LIST, partitions=[['IBM', 'ORCL', 'MSFT'], ['GOOG', 'FB']],dbPath=dbPath)
df = pd.DataFrame({'sym':['IBM', 'ORCL', 'MSFT', 'GOOG', 'FB'], 'val':[1,2,3,4,5]})
t = s.table(data=df)
db.createPartitionedTable(table=t, tableName='pt', partitionColumns='sym').append(t)
re = s.loadTable(tableName='pt', dbPath=dbPath).toDF()Partitions are based on hash values of id:
dbPath="dfs://db_hash_int"
if s.existsDatabase(dbPath):
s.dropDatabase(dbPath)
db = s.database(dbName='mydb', partitionType=keys.HASH, partitions=[keys.DT_INT, 2], dbPath=dbPath)
df = pd.DataFrame({'id':[1,2,3,4,5], 'val':[10, 20, 30, 40, 50]})
t = s.table(data=df)
pt = db.createPartitionedTable(table=t, tableName='pt', partitionColumns='id')
pt.append(t)
re = s.loadTable(tableName='pt', dbPath=dbPath).toDF()The first level of partitions uses a VALUE domain and the second level of partitions uses a RANGE domain.
Please note that when creating a DFS database with COMPO domain, the parameter 'dbPath' for each partition level must be either an empty string or unspecified.
db1 = s.database('db1', partitionType=keys.VALUE,partitions=np.array(["2012-01-01", "2012-01-06"], dtype="datetime64[D]"), dbPath='')
db2 = s.database('db2', partitionType=keys.RANGE,partitions=[1, 6, 11], dbPath='')
dbPath="dfs://db_compo_test"
if s.existsDatabase(dbPath):
s.dropDatabase(dbPath)
db = s.database(dbName='mydb', partitionType=keys.COMPO, partitions=[db1, db2], dbPath=dbPath)
df = pd.DataFrame({'date':np.array(['2012-01-01', '2012-01-01', '2012-01-06', '2012-01-06'], dtype='datetime64'), 'val': [1, 6, 1, 6]})
t = s.table(data=df)
db.createPartitionedTable(table=t, tableName='pt', partitionColumns=['date', 'val']).append(t)
re = s.loadTable(tableName='pt', dbPath=dbPath).toDF()dbPath="dfs://valuedb"
dstr = """
dbPath="dfs://valuedb"
if (existsDatabase(dbPath)){
dropDatabase(dbPath)
}
mydb=database(dbPath, VALUE, ['AMZN','NFLX', 'NVDA'])
t=table(take(['AMZN','NFLX', 'NVDA'], 10) as sym, 1..10 as id)
mydb.createPartitionedTable(t,`pt,`sym).append!(t)
"""
t1=s.run(dstr)
t1=s.loadTable(tableName="pt",dbPath=dbPath)
t1.toDF()
# output
sym id
0 AMZN 1
1 AMZN 4
2 AMZN 7
3 AMZN 10
4 NFLX 2
5 NFLX 5
6 NFLX 8
7 NVDA 3
8 NVDA 6
9 NVDA 9There are 2 types of DolphinDB databases: in-memory database and DFS (Distributed File System) database.
The examples below use a csv file data_example.csv. Please download it and save it under the directory as specified in "WORK_DIR" in the example below.
To import text files into DolphinDB as an in-memory table, use session method loadText. It returns a DolphinDB table object in Python that corresponds to an in-memory table on the DolphinDB server. This DolphinDB table object in Python has a method toDF to convert it to a pandas DataFrame.
Please note that to use method loadText to load a text file as an in-memory table, table size must be smaller than available memory.
WORK_DIR = "C:/DolphinDB/Data"
# return a DolphinDB table object in Python
trade=s.loadText(WORK_DIR+"/example.csv")
# convert the imported DolphinDB table object into a pandas DataFrame
df = trade.toDF()
print(df)
# output
TICKER date VOL PRC BID ASK
0 AMZN 1997.05.16 6029815 23.50000 23.50000 23.6250
1 AMZN 1997.05.17 1232226 20.75000 20.50000 21.0000
2 AMZN 1997.05.20 512070 20.50000 20.50000 20.6250
3 AMZN 1997.05.21 456357 19.62500 19.62500 19.7500
4 AMZN 1997.05.22 1577414 17.12500 17.12500 17.2500
5 AMZN 1997.05.23 983855 16.75000 16.62500 16.7500
...
13134 NFLX 2016.12.29 3444729 125.33000 125.31000 125.3300
13135 NFLX 2016.12.30 4455012 123.80000 123.80000 123.8300
The default delimiter for function loadText is comma ",". We can also use other delimiters. For example, to import a tabular text file:
t1=s.loadText(WORK_DIR+"/t1.tsv", '\t')
To load data files that are larger than available memory into DolphinDB, we can load data into a DFS database.
The examples below use the database "valuedb". The following script deletes the database if it already exists.
if s.existsDatabase("dfs://valuedb"):
s.dropDatabase("dfs://valuedb")Now create a value-based DFS database "valuedb" with a session method database. We use a VALUE partition with stock ticker as the partitioning column. The parameter "partitions" indicates the partitioning scheme.
import dolphindb.settings as keys
s.database(dbName='mydb', partitionType=keys.VALUE, partitions=['AMZN','NFLX', 'NVDA'], dbPath='dfs://valuedb')
# this is equivalent to: s.run("db=database('dfs://valuedb', VALUE, ['AMZN','NFLX', 'NVDA'])")
In addition to VALUE partition, DolphinDB also supports RANGE, LIST, COMBO, and HASH partitions.
Once a DFS database has been created, the partition domain cannot be changed. The partitioning scheme generally cannot be revised, but we can use functions addValuePartitions and addRangePartitions to add partitions for DFS databases with VALUE and RANGE partitions (or VALUE and RANGE partitions in a COMPO domain), respectively.
After a DFS database is created successfully, we can import text files to a partitioned table in the DFS database with function loadTextEx. If the partitioned table does not exist, loadTextEx creates it and appends the imported data to it. Otherwise, the function appends the imported data to the partitioned table.
The parameters of function loadTextEx:
- "dbPath" is the database path
- "tableName" is the partitioned table name
- "partitionColumns" is the partitioning columns
- "remoteFilePath" is the absolute path of the text file on the DolphinDB server.
- "delimiter" is the delimiter of the text file (comma by default).
In the following example, function loadTextEx creates a partitioned table "trade" on the DolphinDB server and then appends the data from "example.csv" to the table.
import dolphindb.settings as keys
if s.existsDatabase("dfs://valuedb"):
s.dropDatabase("dfs://valuedb")
s.database(dbName='mydb', partitionType=keys.VALUE, partitions=["AMZN","NFLX", "NVDA"], dbPath="dfs://valuedb")
trade = s.loadTextEx(dbPath="mydb", tableName='trade',partitionColumns=["TICKER"], remoteFilePath=WORK_DIR + "/data_example.csv")
print(trade.toDF())
# output
TICKER date VOL PRC BID ASK
0 AMZN 1997-05-15 6029815 23.500 23.500 23.625
1 AMZN 1997-05-16 1232226 20.750 20.500 21.000
2 AMZN 1997-05-19 512070 20.500 20.500 20.625
3 AMZN 1997-05-20 456357 19.625 19.625 19.750
4 AMZN 1997-05-21 1577414 17.125 17.125 17.250
... ... ... ... ... ... ...
13131 NVDA 2016-12-23 16193331 109.780 109.770 109.790
13132 NVDA 2016-12-27 29857132 117.320 117.310 117.320
13133 NVDA 2016-12-28 57384116 109.250 109.250 109.290
13134 NVDA 2016-12-29 54384676 111.430 111.260 111.420
13135 NVDA 2016-12-30 30323259 106.740 106.730 106.750
[13136 rows x 6 columns]
# the number of rows of the table
print(trade.rows)
# output
13136
# the number of columns of the table
print(trade.cols)
# output
6
print(trade.schema)
# output
name typeString typeInt
0 TICKER SYMBOL 17
1 date DATE 6
2 VOL INT 4
3 PRC DOUBLE 16
4 BID DOUBLE 16
5 ASK DOUBLE 16trade = s.table(dbPath="dfs://valuedb", data="trade")Operations on an in-memory partitioned table are faster than those on a nonpartitioned in-memory table as the former utilizes parallel computing.
We can use function loadTextEx to create an in-memory partitioned database with an empty string for the parameter "dbPath".
import dolphindb.settings as keys
s.database(dbName='mydb', partitionType=keys.VALUE, partitions=["AMZN","NFLX","NVDA"], dbPath="")
trade=s.loadTextEx(dbPath="mydb", partitionColumns=["TICKER"], tableName='trade', remoteFilePath=WORK_DIR + "/data_example.csv")
trade.toDF()Function ploadText loads a text file in parallel to generate an in-memory partitioned table. It runs much faster than loadText.
trade=s.ploadText(WORK_DIR+"/data_example.csv")
print(trade.rows)
# output
13136DFS tables in DolphinDB support concurrent writes.
Note that DolphinDB does not allow multiple writers to write to the same partition at the same time. Therefore, when multiple threads are writing to the same database concurrently, we need to make sure each of them writes to a different partition. Python API provides a convenient way for it.
With DolphinDB server version 1.30 or above, we can write to DFS tables with the PartitionedTableAppender object in Python API. The user needs to first specify a connection pool. The system then obtains information about partitions before assigning the partitions to the connection pool for concurrent writing. A partition can only be written to by one connection at a time.
With the latest DolphinDB version 1.30 and above, we can write to partitioned tables using the PartitionTableAppender object in the Python API. The rationale is to design a connection pool for multithreaded writing, and then use server's schema function to obtain partition information for distributed tables, classifying the data written by the user by the specified partition column, and handing it over to different connections for parallel writing.
PartitionedTableAppender(dbPath, tableName, partitionColName, dbConnectionPool)- dbPath: DFS database path
- tableName: name of a DFS table
- partitionColName: partitioning column name
- dbConnectionPool: connection pool
The following script creates database dfs://Rangedb and partitioned table pt, then creates a connection pool for PartitionedTableAppender, and use the append method to write data to pt concurrently.
import pandas as pd
import dolphindb as ddb
import numpy as np
s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")
script='''
dbPath = "dfs://Rangedb"
if(existsDatabase(dbPath))
dropDatabase(dbPath)
t = table(100:100,`id`val1`val2,[INT,DOUBLE,SYMBOL])
db=database(dbPath,RANGE, 1 100 200 300)
pt = db.createPartitionedTable(t, `pt, `id)
'''
s.run(script)
s.close()
pool = ddb.DBConnectionPool ("localhost", 8848, 20, "admin", "123456")
appender = ddb.PartitionedTableAppender("dfs://Rangedb","pt", "id", pool)
v = []
for i in range(0,10000000):
v.append("a"+str(i%100))
data = pd.DataFrame({"id":np.random.randint(1,300,10000000),"val1":np.random.rand(10000000),"val2":v})
re = appender.append(data)
print(re)Use function loadTable to load a table from a DolphinDB database. Parameter "tableName" indicates the partitioned table name; "dbPath" is the database location.
trade = s.loadTable(tableName="trade",dbPath="dfs://valuedb")
print(trade.schema)
#output
name typeString typeInt comment
0 TICKER SYMBOL 17
1 date DATE 6
2 VOL INT 4
3 PRC DOUBLE 16
4 BID DOUBLE 16
5 ASK DOUBLE 16
print(trade.toDF())
# output
TICKER date VOL PRC BID ASK
0 AMZN 1997-05-15 6029815 23.500 23.500 23.625
1 AMZN 1997-05-16 1232226 20.750 20.500 21.000
2 AMZN 1997-05-19 512070 20.500 20.500 20.625
3 AMZN 1997-05-20 456357 19.625 19.625 19.750
4 AMZN 1997-05-21 1577414 17.125 17.125 17.250
... ... ... ... ... ... ...
13131 NVDA 2016-12-23 16193331 109.780 109.770 109.790
13132 NVDA 2016-12-27 29857132 117.320 117.310 117.320
13133 NVDA 2016-12-28 57384116 109.250 109.250 109.290
13134 NVDA 2016-12-29 54384676 111.430 111.260 111.420
13135 NVDA 2016-12-30 30323259 106.740 106.730 106.750Method loadTableBySQL imports only the rows of a DFS table that satisfy the filtering conditions in a SQL query as an in-memory partitioned table.
import os
import dolphindb.settings as keys
if s.existsDatabase("dfs://valuedb" or os.path.exists("dfs://valuedb")):
s.dropDatabase("dfs://valuedb")
s.database(dbName='mydb', partitionType=keys.VALUE, partitions=["AMZN","NFLX", "NVDA"], dbPath="dfs://valuedb")
t = s.loadTextEx(dbPath="mydb", tableName='trade',partitionColumns=["TICKER"], remoteFilePath=WORK_DIR + "/data_example.csv")
trade = s.loadTableBySQL(tableName="trade", dbPath="dfs://valuedb", sql="select * from trade where date>2010.01.01")
print(trade.rows)
# output
5286For tables with large amounts of data, Python API provides a way to load them in blocks (for DolphinDB 1.20.5 or above, and DolphinDB Python API 1.30.0.6 or above).
Execute the following script in Python to create a large table:
s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")
script='''
rows=100000;
testblock=table(take(1,rows) as id,take(`A,rows) as symbol,take(2020.08.01..2020.10.01,rows) as date, rand(50,rows) as size,rand(50.5,rows) as price);
'''
s.run(script)Use the parameter 'fetchSize' of the run method to specify the size of a block. A BlockReader object is returned. We can use the read method of the BlockReader object to read data in blocks. Please note that the value of 'fetchSize' cannot be smaller than 8192.
script1='''
select * from testblock
'''
block= s.run(script1, fetchSize = 8192)
total = 0
while block.hasNext():
tem = block.read()
total+=len(tem)
print("total=", total)DolphinDB Python API saves data downloaded from DolphinDB server as native Python objects.
| DolphinDB | Python | DolphinDB data | Python data |
|---|---|---|---|
| scalar | Numbers, Strings, NumPy.datetime64 | see section 6.3.2 | see section 6.3.2 |
| vector | NumPy.array | 1..3 | [1 2 3] |
| pair | Lists | 1:5 | [1, 5] |
| matrix | Lists | 1..6$2:3 | [array([[1, 3, 5],[2, 4, 6]], dtype=int32), None, None] |
| set | Sets | set(3 5 4 6) | {3, 4, 5, 6} |
| dictionary | Dictionaries | dict(['IBM','MS','ORCL'], 170.5 56.2 49.5) | {'MS': 56.2, 'IBM': 170.5, 'ORCL': 49.5} |
| table | pandas.DataFame | see section 6.1 | see section 6.1 |
The table below explains data type conversion when data is downloaded from DolphinDB database and converted into a Python DataFrame with function toDF().
- DolphinDB CHAR types are converted into Python int64 type. Use Python function
chrto convert CHAR type into a character. - As all temporal types in Python pandas are datetime64, all DolphinDB temporal types are converted into datetime64 type. MONTH type such as 2012.06M is converted into 2012-06-01 (the first day of the month).
- TIME, MINUTE, SECOND and NANOTIME types do not include information about date. 1970-01-01 is automatically added during conversion. For example, 13:30m is converted into 1970-01-01 13:30:00.
| DolphinDB type | Python type | DolphinDB data | Python data |
|---|---|---|---|
| BOOL | bool | [true,00b] | [True, nan] |
| CHAR | int64 | [12c,00c] | [12, nan] |
| SHORT | int64 | [12,00h] | [12, nan] |
| INT | int64 | [12,00i] | [12, nan] |
| LONG | int64 | [12l,00l] | [12, nan] |
| DOUBLE | float64 | [3.5,00F] | [3.5,nan] |
| FLOAT | float64 | [3.5,00f] | [3.5, nan] |
| SYMBOL | object | symbol(["AAPL",NULL]) | ["AAPL",""] |
| STRING | object | ["AAPL",string()] | ["AAPL", ""] |
| DATE | datetime64 | [2012.6.12,date()] | [2012-06-12, NaT] |
| MONTH | datetime64 | [2012.06M, month()] | [2012-06-01, NaT] |
| TIME | datetime64 | [13:10:10.008,time()] | [1970-01-01 13:10:10.008, NaT] |
| MINUTE | datetime64 | [13:30,minute()] | [1970-01-01 13:30:00, NaT] |
| SECOND | datetime64 | [13:30:10,second()] | [1970-01-01 13:30:10, NaT] |
| DATETIME | datetime64 | [2012.06.13 13:30:10,datetime()] | [2012-06-13 13:30:10,NaT] |
| TIMESTAMP | datetime64 | [2012.06.13 13:30:10.008,timestamp()] | [2012-06-13 13:30:10.008,NaT] |
| NANOTIME | datetime64 | [13:30:10.008007006, nanotime()] | [1970-01-01 13:30:10.008007006,NaT] |
| NANOTIMESTAMP | datetime64 | [2012.06.13 13:30:10.008007006,nanotimestamp()] | [2012-06-13 13:30:10.008007006,NaT] |
| UUID | object | 5d212a78-cc48-e3b1-4235-b4d91473ee87 | "5d212a78-cc48-e3b1-4235-b4d91473ee87" |
| IPADDR | object | 192.168.1.13 | "192.168.1.13" |
| INT128 | object | e1671797c52e15f763380b45e841ec32 | "e1671797c52e15f763380b45e841ec32" |
When data is downloaded from DolphinDB database and converted into a Python DataFrame with function toDF(), NULLs of logical, temporal and numeric types are converted into NaN or NaT; NULLs of string types are converted into empty string.
This section introduces how to use Python API to upload data and append it to DolphinDB tables.
- Use function
tableInsertto append data or a table - Use
insert intostatement to append data
Execute the following script in Python to generate an empty in-memory table to be used in the examples later:
import dolphindb as ddb
s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")
script = """t = table(1000:0,`id`date`ticker`price, [INT,DATE,SYMBOL,DOUBLE])
share t as tglobal"""
s.run(script)ids = [1,2,3]
dates = np.array(['2019-03-03','2019-03-04','2019-03-05' ], dtype="datetime64[D]")
tickers=['AAPL','GOOG','AAPL']
prices = [302.5, 295.6, 297.5]
args = [ids, dates, tickers, prices]
s.run("tableInsert{tglobal}", args)
#output
3
s.run("tglobal")
#output
id date ticker price
0 1 2019-03-03 AAPL 302.5
1 2 2019-03-04 GOOG 295.6
2 3 2019-03-05 AAPL 297.5- If there is no temporal type column in the table
We can upload a DataFrame to the server and append it to an in-memory table with partial application.
script = """t = table(1000:0,`id`ticker`price, [INT,SYMBOL,DOUBLE])
share t as tglobal"""
s.run(script)
tb=pd.DataFrame({'id': [1, 2, 2, 3],
'ticker': ['AAPL', 'AMZN', 'AMZN', 'A'],
'price': [22, 3.5, 21, 26]})
s.run("tableInsert{tglobal}",tb)
#output
4
s.run("tglobal")
#output
id ticker price
0 1 AAPL 22.0
1 2 AMZN 3.5
2 2 AMZN 21.0
3 3 A 26.0- If there is a temporal type column in the table
As the only temporal data type in Python pandas is datetime64, all temporal columns of a DataFrame are converted into nanotimestamp type after uploaded to DolphinDB. Therefore, if the DataFrame contains a temporal column, we need to convert its data type before appending the DataFrame to an in-memory table. In the following example, we convert the nanotimestamp type into date type.
script = """t = table(1000:0,`id`date`ticker`price, [INT,DATE,SYMBOL,DOUBLE])
share t as tglobal"""
s.run(script)
import pandas as pd
tb=pd.DataFrame(createDemoDict())
s.upload({'tb':tb})
s.run("tableInsert(tglobal,(select id, date(date) as date, ticker, price from tb))")
s.run("tglobal")
#output
id date ticker price
0 1 2019-02-04 AAPL 22.0
1 2 2019-02-05 AMZN 3.5
2 2 2019-02-09 AMZN 21.0
3 3 2019-02-13 A 26.0To insert a single row of data:
import numpy as np
script = "insert into tglobal values(%s, date(%s), %s, %s)" % (1, np.datetime64("2019-01-01").astype(np.int64), '`AAPL', 5.6)
s.run(script)As introduced in 6.1.2, we also need to convert the temporal column data type.
To insert multiple rows of data:
import numpy as np
import random
rowNum = 5
ids = np.arange(1, rowNum+1, 1, dtype=np.int32)
dates = np.array(pd.date_range('4/1/2019', periods=rowNum), dtype='datetime64[D]')
tickers = np.repeat("AA", rowNum)
prices = np.arange(1, 0.6*(rowNum+1), 0.6, dtype=np.float64)
s.upload({'ids':ids, "dates":dates, "tickers":tickers, "prices":prices})
script = "insert into tglobal values(ids,dates,tickers,prices);"
s.run(script)As the only temporal data type in Python pandas is datetime64, all temporal columns of a DataFrame are converted into nanotimestamp type after uploaded to DolphinDB. Each time we use tableInsert or insert into to append a DataFrame with a temporal column to an in-memory table or DFS table, we need to conduct a data type conversion for the temporal column. For automatic data type conversion in these situations, Python API offers tableAppender object.
tableAppender(dbPath="", tableName="", ddbSession=None, action="proxy.php?url=https%3A%2F%2Fgithub.com%2FfitColumnType")
- dbPath: The path of a DFS database. Leave it unspecified for in-memory tables.
- tableName: The name of a table.
- ddbSession: A session connected to DolphinDB server.
- action: What to do when appending. Now only supports "fitColumnType", which means convert temporal column types.
The example below appends to a shared in-memory table t with tableAppender:
import pandas as pd
import dolphindb as ddb
import numpy as np
s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")
s.run("share table(1000:0, `sym`timestamp`qty, [SYMBOL, TIMESTAMP, INT]) as t")
appender = ddb.tableAppender(tableName="t", ddbSession=s)
sym = ['A1', 'A2', 'A3', 'A4', 'A5']
timestamp = np.array(['2012-06-13 13:30:10.008', 'NaT','2012-06-13 13:30:10.008', '2012-06-13 15:30:10.008', 'NaT'], dtype="datetime64")
qty = np.arange(1, 6)
data = pd.DataFrame({'sym': sym, 'timestamp': timestamp, 'qty': qty})
num = appender.append(data)
print(num)
t = s.run("t")
print(t)The example below appends to a DFS table pt with tableAppender:
import pandas as pd
import dolphindb as ddb
import numpy as np
s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")
script='''
dbPath = "dfs://tableAppender"
if(existsDatabase(dbPath))
dropDatabase(dbPath)
t = table(1000:0, `sym`date`month`time`minute`second`datetime`timestamp`nanotimestamp`qty, [SYMBOL, DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIMESTAMP, INT])
db=database(dbPath,RANGE,100000 200000 300000 400000 600001)
pt = db.createPartitionedTable(t, `pt, `qty)
'''
s.run(script)
appender = ddb.tableAppender("dfs://tableAppender","pt", s)
sym = list(map(str, np.arange(100000, 600000)))
date = np.array(np.tile(['2012-01-01', 'NaT', '1965-07-25', 'NaT', '2020-12-23', '1970-01-01', 'NaT', 'NaT', 'NaT', '2009-08-05'],50000), dtype="datetime64[D]")
month = np.array(np.tile(['1965-08', 'NaT','2012-02', '2012-03', 'NaT'],100000), dtype="datetime64")
time = np.array(np.tile(['2012-01-01T00:00:00.000', '2015-08-26T05:12:48.426', 'NaT', 'NaT', '2015-06-09T23:59:59.999'],100000), dtype="datetime64")
second = np.array(np.tile(['2012-01-01T00:00:00', '2015-08-26T05:12:48', 'NaT', 'NaT', '2015-06-09T23:59:59'],100000), dtype="datetime64")
nanotime = np.array(np.tile(['2012-01-01T00:00:00.000000000', '2015-08-26T05:12:48.008007006', 'NaT', 'NaT', '2015-06-09T23:59:59.999008007'],100000), dtype="datetime64")
qty = np.arange(100000, 600000)
data = pd.DataFrame({'sym': sym, 'date': date, 'month':month, 'time':time, 'minute':time, 'second':second, 'datetime':second, 'timestamp':time, 'nanotimestamp':nanotime, 'qty': qty})
num = appender.append(data)
print(num)
print(s.run("select * from pt"))Use the following script to create a DFS table in DolphinDB:
import dolphindb as ddb
s = ddb.session()
s.connect("localhost", 8848, "admin", "123456")
dbPath="dfs://testPython"
tableName='t1'
script = """
dbPath='{db}'
if(existsDatabase(dbPath))
dropDatabase(dbPath)
db = database(dbPath, VALUE, 0..100)
t1 = table(10000:0,`id`cbool`cchar`cshort`cint`clong`cdate`cmonth`ctime`cminute`csecond`cdatetime`ctimestamp`cnanotime`cnanotimestamp`cfloat`cdouble`csymbol`cstring,[INT,BOOL,CHAR,SHORT,INT,LONG,DATE,MONTH,TIME,MINUTE,SECOND,DATETIME,TIMESTAMP,NANOTIME,NANOTIMESTAMP,FLOAT,DOUBLE,SYMBOL,STRING])
insert into t1 values (0,true,'a',122h,21,22l,2012.06.12,2012.06M,13:10:10.008,13:30m,13:30:10,2012.06.13 13:30:10,2012.06.13 13:30:10.008,13:30:10.008007006,2012.06.13 13:30:10.008007006,2.1f,2.1,'','')
t = db.createPartitionedTable(t1, `{tb}, `id)
t.append!(t1)""".format(db=dbPath,tb=tableName)
s.run(script)Use tableInsert to append data to a DFS table. In the following example, we use the user-defined function createDemoDataFrame() to create a DataFrame, then append it to a DFS table. Please note that when appending to a DFS table, the temporal data types are automatically converted.
tb = createDemoDataFrame()
s.run("tableInsert{{loadTable('{db}', `{tb})}}".format(db=dbPath,tb=tableName), tb)A Session object has methods with the same purpose as certain DolphinDB built-in functions to work with databases and tables.
- For databases/partitions
| method | details |
|---|---|
| database | Create a database |
| dropDatabase(dbPath) | Delete a database |
| dropPartition(dbPath, partitionPaths, tableName) | Delete a database partition |
| existsDatabase | Determine if a database exists |
- For tables
| method | details |
|---|---|
| dropTable(dbPath, tableName) | Delete a table |
| existsTable | Determine if a table exists |
| loadTable | Load a table into memory |
| table | Create a table |
Can all the following methods for a table object in Python,这些方法是Table类方法。
| method | details |
|---|---|
| append | Append to a table |
| drop(colNameList) | Delete columns of a table |
| executeAs(tableName) | Save result as an in-memory table with the specified name |
| execute() | Execute script. Used with update or delete |
| toDF() | Convert DolphinDB table object into pandas DataFrame |
The tables above only lists most commonly used methods. Please refer to session.py and table.py文件关于Session类和Table类提供的所有方法。
Use function database to create DFS databases:
import dolphindb.settings as keys
s.database(dbName='mydb', partitionType=keys.VALUE, partitions=["AMZN","NFLX", "NVDA"], dbPath="dfs://valuedb")Use dropDatabase to delete databases:
if s.existsDatabase("dfs://valuedb"):
s.dropDatabase("dfs://valuedb")Use dropPartition to delete database partitions. Please note that if the name of a partition to be deleted is quoted in DolphinDB's dropPartition command, then we need to add another level of quotes to the partition name in Python API's dropPartition method. For example, if the parameter of 'partitions' in DolphinDB's dropPartition command is ["AMZN","NFLX"], then in Python API's dropPartition method the parameter 'partitions' should be ["'AMZN'","'NFLX'"]. Similarly, in Python API for range partitions: partitionPaths=["'/0_50'","'/50_100'"]; for list partitions: partitionPaths=["'/List0'","'/List1'"], etc.
import dolphindb.settings as keys
if s.existsDatabase("dfs://valuedb"):
s.dropDatabase("dfs://valuedb")
s.database(dbName='mydb', partitionType=keys.VALUE, partitions=["AMZN","NFLX","NVDA"], dbPath="dfs://valuedb")
trade=s.loadTextEx(dbPath="dfs://valuedb", partitionColumns=["TICKER"], tableName='trade', remoteFilePath=WORK_DIR + "/data_example.csv")
print(trade.rows)
# output
13136
s.dropPartition("dfs://valuedb", partitionPaths=["'AMZN'","'NFLX'"]) # or s.dropPartition("dfs://valuedb", partitionPaths=["`AMZN`NFLX"])
trade = s.loadTable(tableName="trade", dbPath="dfs://valuedb")
print(trade.rows)
# output
4516
print(trade.select("distinct TICKER").toDF())
# output
distinct_TICKER
0 NVDAPlease refer to [section 5. Load data from DolphinDB database](#5-Load data from DolphinDB database).
Please refer to [section 6.1](#6.1-Append to in-memory tables) about how to append to in-memory tables.
Please refer to [section 6.2](#6.2-Append to DFS tables) about how to append to DFS tables.
update can only be used on in-memory tables and must be used with execute together.
trade=s.loadText(WORK_DIR+"/data_example.csv")
trade = trade.update(["VOL"],["999999"]).where("TICKER=`AMZN").where(["date=2015.12.16"]).execute()
t1=trade.where("ticker=`AMZN").where("VOL=999999")
print(t1.toDF())
# output
TICKER date VOL PRC BID ASK
0 AMZN 1997-05-15 999999 23.50000 23.50000 23.62500
1 AMZN 1997-05-16 999999 20.75000 20.50000 21.00000
2 AMZN 1997-05-19 999999 20.50000 20.50000 20.62500
3 AMZN 1997-05-20 999999 19.62500 19.62500 19.75000
4 AMZN 1997-05-21 999999 17.12500 17.12500 17.25000
...
4936 AMZN 2016-12-23 999999 760.59003 760.33002 760.59003
4937 AMZN 2016-12-27 999999 771.40002 771.40002 771.76001
4938 AMZN 2016-12-28 999999 772.13000 771.92999 772.15997
4939 AMZN 2016-12-29 999999 765.15002 764.66998 765.15997
4940 AMZN 2016-12-30 999999 749.87000 750.02002 750.40002
[4941 rows x 6 columns]delete must be used with execute.
trade=s.loadText(WORK_DIR+"/data_example.csv")
trade.delete().where('date<2013.01.01').execute()
print(trade.rows)
# output
3024We can only delete columns from an in-memory table.
trade=s.loadText(WORK_DIR+"/data_example.csv")
t1=trade.drop(['ask', 'bid'])
print(t1.top(5).toDF())
# output
TICKER date VOL PRC
0 AMZN 1997.05.15 6029815 23.500
1 AMZN 1997.05.16 1232226 20.750
2 AMZN 1997.05.19 512070 20.500
3 AMZN 1997.05.20 456357 19.625
4 AMZN 1997.05.21 1577414 17.125if s.existsDatabase("dfs://valuedb"):
s.dropDatabase("dfs://valuedb")
s.database(dbName='mydb', partitionType=keys.VALUE, partitions=["AMZN","NFLX","NVDA"], dbPath="dfs://valuedb")
s.loadTextEx(dbPath="dfs://valuedb", partitionColumns=["TICKER"], tableName='trade', remoteFilePath=WORK_DIR + "/data_example.csv")
s.dropTable(dbPath="dfs://valuedb", tableName="trade")trade=s.loadText(WORK_DIR+"/data_example.csv")
print(trade.select(['ticker','date','bid','ask','prc','vol']).toDF())
# output
ticker date bid ask prc vol
0 AMZN 1997-05-15 23.50000 23.625 23.50000 6029815
1 AMZN 1997-05-16 20.50000 21.000 20.75000 1232226
2 AMZN 1997-05-19 20.50000 20.625 20.50000 512070
3 AMZN 1997-05-20 19.62500 19.750 19.62500 456357
4 AMZN 1997-05-21 17.12500 17.250 17.12500 1577414
...We can use the showSQL method to display the SQL statement.
print(trade.select(['ticker','date','bid','ask','prc','vol']).showSQL())
# output
select ticker,date,bid,ask,prc,vol from T64afd5a6print(trade.select("ticker,date,bid,ask,prc,vol").where("date=2012.09.06").where("vol<10000000").toDF())
# output
ticker date bid ask prc vol
0 AMZN 2012-09-06 251.42999 251.56 251.38 5657816
1 NFLX 2012-09-06 56.65000 56.66 56.65 5368963
...Get the top records in a table.
trade=s.loadText(WORK_DIR+"/data_example.csv")
trade.top(5).toDF()
# output
TICKER date VOL PRC BID ASK
0 AMZN 1997.05.16 6029815 23.50000 23.50000 23.6250
1 AMZN 1997.05.17 1232226 20.75000 20.50000 21.0000
2 AMZN 1997.05.20 512070 20.50000 20.50000 20.6250
3 AMZN 1997.05.21 456357 19.62500 19.62500 19.7500
4 AMZN 1997.05.22 1577414 17.12500 17.12500 17.2500trade=s.loadText(WORK_DIR+"/data_example.csv")
# use chaining WHERE conditions and save result to DolphinDB server variable "t1" through function "executeAs"
t1=trade.select(['date','bid','ask','prc','vol']).where('TICKER=`AMZN').where('bid!=NULL').where('ask!=NULL').where('vol>10000000').sort('vol desc').executeAs("t1")
print(t1.toDF())
# output
date bid ask prc vol
0 2007.04.25 56.80 56.8100 56.810 104463043
1 1999.09.29 80.75 80.8125 80.750 80380734
2 2006.07.26 26.17 26.1800 26.260 76996899
3 2007.04.26 62.77 62.8300 62.781 62451660
4 2005.02.03 35.74 35.7300 35.750 60580703
...
print(t1.rows)
# output
765We can use the showSQL method to display the SQL statement.
print(trade.select(['date','bid','ask','prc','vol']).where('TICKER=`AMZN').where('bid!=NULL').where('ask!=NULL').where('vol>10000000').sort('vol desc').showSQL())
# output
select date,bid,ask,prc,vol from Tff260d29 where TICKER=`AMZN and bid!=NULL and ask!=NULL and vol>10000000 order by vol descWe can pass a list of field names as a string to select method and a list of conditions as string to where method.
trade=s.loadText(WORK_DIR+"/data_example.csv")
print(trade.select("ticker, date, vol").where("bid!=NULL, ask!=NULL, vol>50000000").toDF())
# output
ticker date vol
0 AMZN 1999-09-29 80380734
1 AMZN 2000-06-23 52221978
2 AMZN 2001-11-26 51543686
3 AMZN 2002-01-22 57235489
4 AMZN 2005-02-03 60580703
...
38 NFLX 2016-01-20 53009419
39 NFLX 2016-04-19 55728765
40 NFLX 2016-07-19 55685209Method groupby must be followed by an aggregate function such as count, sum, avg, std, etc.
if s.existsDatabase("dfs://valuedb"):
s.dropDatabase("dfs://valuedb")
s.database(dbName='mydb', partitionType=keys.VALUE, partitions=["AMZN","NFLX","NVDA"], dbPath="dfs://valuedb")
s.loadTextEx(dbPath="dfs://valuedb", partitionColumns=["TICKER"], tableName='trade', remoteFilePath=WORK_DIR + "/data_example.csv")
trade = s.loadTable(tableName="trade",dbPath="dfs://valuedb")
print(trade.select('count(*)').groupby(['ticker']).sort(bys=['ticker desc']).toDF())
# output
ticker count_ticker
0 NVDA 4516
1 NFLX 3679
2 AMZN 4941Calculate the sum of column "vol" and the sum of column "prc" in each "ticker" group:
trade = s.loadTable(tableName="trade",dbPath="dfs://valuedb")
print(trade.select(['vol','prc']).groupby(['ticker']).sum().toDF())
# output
ticker sum_vol sum_prc
0 AMZN 33706396492 772503.81377
1 NFLX 14928048887 421568.81674
2 NVDA 46879603806 127139.51092groupby can be used with having:
trade = s.loadTable(tableName="trade",dbPath="dfs://valuedb")
print(trade.select('count(ask)').groupby(['vol']).having('count(ask)>1').toDF())
# output
vol count_ask
0 579392 2
1 3683504 2
2 5732076 2
3 6299736 2
4 6438038 2
5 6946976 2
6 8160197 2
7 8924303 2
...contextby is similar to groupby except that for each group, groupby returns a scalar whereas contextby returns a vector of the same size as the group.
df= s.loadTable(tableName="trade",dbPath="dfs://valuedb").contextby('ticker').top(3).toDF()
print(df)
# output
TICKER date VOL PRC BID ASK
0 AMZN 1997-05-15 6029815 23.5000 23.5000 23.6250
1 AMZN 1997-05-16 1232226 20.7500 20.5000 21.0000
2 AMZN 1997-05-19 512070 20.5000 20.5000 20.6250
3 NFLX 2002-05-23 7507079 16.7500 16.7500 16.8500
4 NFLX 2002-05-24 797783 16.9400 16.9400 16.9500
5 NFLX 2002-05-28 474866 16.2000 16.2000 16.3700
6 NVDA 1999-01-22 5702636 19.6875 19.6250 19.6875
7 NVDA 1999-01-25 1074571 21.7500 21.7500 21.8750
8 NVDA 1999-01-26 719199 20.0625 20.0625 20.1250df= s.loadTable(tableName="trade",dbPath="dfs://valuedb").select("TICKER, month(date) as month, cumsum(VOL)").contextby("TICKER,month(date)").toDF()
print(df)
# output
TICKER month cumsum_VOL
0 AMZN 1997-05-01 6029815
1 AMZN 1997-05-01 7262041
2 AMZN 1997-05-01 7774111
3 AMZN 1997-05-01 8230468
4 AMZN 1997-05-01 9807882
...
13131 NVDA 2016-12-01 280114768
13132 NVDA 2016-12-01 309971900
13133 NVDA 2016-12-01 367356016
13134 NVDA 2016-12-01 421740692
13135 NVDA 2016-12-01 452063951df= s.loadTable(tableName="trade",dbPath="dfs://valuedb").select("TICKER, month(date) as month, sum(VOL)").contextby("TICKER,month(date)").toDF()
print(df)
# output
TICKER month sum_VOL
0 AMZN 1997-05-01 13736587
1 AMZN 1997-05-01 13736587
2 AMZN 1997-05-01 13736587
3 AMZN 1997-05-01 13736587
4 AMZN 1997-05-01 13736587
...
13131 NVDA 2016-12-01 452063951
13132 NVDA 2016-12-01 452063951
13133 NVDA 2016-12-01 452063951
13134 NVDA 2016-12-01 452063951
13135 NVDA 2016-12-01 452063951df= s.loadTable(dbPath="dfs://valuedb", tableName="trade").contextby('ticker').having("sum(VOL)>40000000000").toDF()
print(df)
# output
TICKER date VOL PRC BID ASK
0 NVDA 1999-01-22 5702636 19.6875 19.6250 19.6875
1 NVDA 1999-01-25 1074571 21.7500 21.7500 21.8750
2 NVDA 1999-01-26 719199 20.0625 20.0625 20.1250
3 NVDA 1999-01-27 510637 20.0000 19.8750 20.0000
4 NVDA 1999-01-28 476094 19.9375 19.8750 20.0000
...
4511 NVDA 2016-12-23 16193331 109.7800 109.7700 109.7900
4512 NVDA 2016-12-27 29857132 117.3200 117.3100 117.3200
4513 NVDA 2016-12-28 57384116 109.2500 109.2500 109.2900
4514 NVDA 2016-12-29 54384676 111.4300 111.2600 111.4200
4515 NVDA 2016-12-30 30323259 106.7400 106.7300 106.7500DolphinDB table class has method merge for inner, left, and outer join; method merge_asof for asof join; method merge_window for window join.
Specify joining columns with parameter "on" if joining column names are identical in both tables; use parameters "left_on" and "right_on" when joining column names are different. The optional parameter "how" indicates table join type. The default table join mode is inner join.
trade = s.loadTable(dbPath="dfs://valuedb", tableName="trade")
t1 = s.table(data={'TICKER': ['AMZN', 'AMZN', 'AMZN'], 'date': np.array(['2015-12-31', '2015-12-30', '2015-12-29'], dtype='datetime64[D]'), 'open': [695, 685, 674]}, tableAliasName="t1")
s.run("""t1 = select TICKER,date(date) as date,open from t1""")
print(trade.merge(t1,on=["TICKER","date"]).toDF())
# output
TICKER date VOL PRC BID ASK open
0 AMZN 2015.12.29 5734996 693.96997 693.96997 694.20001 674
1 AMZN 2015.12.30 3519303 689.07001 689.07001 689.09998 685
2 AMZN 2015.12.31 3749860 675.89001 675.85999 675.94000 695We need to specify arguments "left_on" and "right_on" when joining column names are different.
trade = s.loadTable(dbPath="dfs://valuedb", tableName="trade")
t1 = s.table(data={'TICKER1': ['AMZN', 'AMZN', 'AMZN'], 'date1': ['2015.12.31', '2015.12.30', '2015.12.29'], 'open': [695, 685, 674]}, tableAliasName="t1")
s.run("""t1 = select TICKER1,date(date1) as date1,open from t1""")
print(trade.merge(t1,left_on=["TICKER","date"], right_on=["TICKER1","date1"]).toDF())
# output
TICKER date VOL PRC BID ASK open
0 AMZN 2015.12.29 5734996 693.96997 693.96997 694.20001 674
1 AMZN 2015.12.30 3519303 689.07001 689.07001 689.09998 685
2 AMZN 2015.12.31 3749860 675.89001 675.85999 675.94000 695To conduct left join, set how="left".
trade = s.loadTable(dbPath="dfs://valuedb", tableName="trade")
t1 = s.table(data={'TICKER': ['AMZN', 'AMZN', 'AMZN'], 'date': ['2015.12.31', '2015.12.30', '2015.12.29'], 'open': [695, 685, 674]}, tableAliasName="t1")
s.run("""t1 = select TICKER,date(date) as date,open from t1""")
print(trade.merge(t1,how="left", on=["TICKER","date"]).where('TICKER=`AMZN').where('2015.12.23<=date<=2015.12.31').toDF())
# output
TICKER date VOL PRC BID ASK open
0 AMZN 2015-12-23 2722922 663.70001 663.48999 663.71002 NaN
1 AMZN 2015-12-24 1092980 662.78998 662.56000 662.79999 NaN
2 AMZN 2015-12-28 3783555 675.20001 675.00000 675.21002 NaN
3 AMZN 2015-12-29 5734996 693.96997 693.96997 694.20001 674.0
4 AMZN 2015-12-30 3519303 689.07001 689.07001 689.09998 685.0
5 AMZN 2015-12-31 3749860 675.89001 675.85999 675.94000 695.0To conduct outer join, set how="outer". A partitioned table can only be outer joined with a partitioned table, and an in-memory table can only be outer joined with an in-memory table.
t1 = s.table(data={'TICKER': ['AMZN', 'AMZN', 'NFLX'], 'date': ['2015.12.29', '2015.12.30', '2015.12.31'], 'open': [674, 685, 942]})
t2 = s.table(data={'TICKER': ['AMZN', 'NFLX', 'NFLX'], 'date': ['2015.12.29', '2015.12.30', '2015.12.31'], 'close': [690, 936, 951]})
print(t1.merge(t2, how="outer", on=["TICKER","date"]).toDF())
# output
TICKER date open TMP_TBL_1b831e46_TICKER TMP_TBL_1b831e46_date close
0 AMZN 2015.12.29 674.0 AMZN 2015.12.29 690.0
1 AMZN 2015.12.30 685.0 NaN
2 NFLX 2015.12.31 942.0 NFLX 2015.12.31 951.0
3 NaN NFLX 2015.12.30 936.0 The asof join function is a type of non-synchronous join. It is similar to the left join function witht the following differences:
-
- The data type of the last matching column is usually temporal. For a row in the left table with time t, if there is not a match of left join in the right table, the row in the right table that corresponds to the most recent time before time t is taken, if all the other matching columns are matched; if there are more than one matching record in the right table, the last record is taken.
-
- If there is only 1 joining column, the asof join function assumes the right table is sorted on the joining column. If there are multiple joining columns, the asof join function assumes the right table is sorted on the last joining column within each group defined by the other joining columns. The right table does not need to be sorted by the other joining columns. If these conditions are not met, we may see unexpected results. The left table does not need to be sorted.
For the examples in this and the next section, we use trades.csv and quotes.csv which have AAPL and FB trades and quotes data on 10/24/2016 taken from NYSE website.
import dolphindb.settings as keys
WORK_DIR = "C:/DolphinDB/Data"
if s.existsDatabase(WORK_DIR+"/tickDB"):
s.dropDatabase(WORK_DIR+"/tickDB")
s.database(dbName='mydb', partitionType=keys.VALUE, partitions=["AAPL","FB"], dbPath=WORK_DIR+"/tickDB")
trades = s.loadTextEx("mydb", tableName='trades',partitionColumns=["Symbol"], remoteFilePath=WORK_DIR + "/trades.csv")
quotes = s.loadTextEx("mydb", tableName='quotes',partitionColumns=["Symbol"], remoteFilePath=WORK_DIR + "/quotes.csv")
print(trades.top(5).toDF())
# output
Time Exchange Symbol Trade_Volume Trade_Price
0 1970-01-01 08:00:00.022239 75 AAPL 300 27.00
1 1970-01-01 08:00:00.022287 75 AAPL 500 27.25
2 1970-01-01 08:00:00.022317 75 AAPL 335 27.26
3 1970-01-01 08:00:00.022341 75 AAPL 100 27.27
4 1970-01-01 08:00:00.022368 75 AAPL 31 27.40
print(quotes.where("second(Time)>=09:29:59").top(5).toDF())
# output
Time Exchange Symbol Bid_Price Bid_Size Offer_Price Offer_Size
0 1970-01-01 09:30:00.005868 90 AAPL 26.89 1 27.10 6
1 1970-01-01 09:30:00.011058 90 AAPL 26.89 11 27.10 6
2 1970-01-01 09:30:00.031523 90 AAPL 26.89 13 27.10 6
3 1970-01-01 09:30:00.284623 80 AAPL 26.89 8 26.98 8
4 1970-01-01 09:30:00.454066 80 AAPL 26.89 8 26.98 1
print(trades.merge_asof(quotes,on=["Symbol","Time"]).select(["Symbol","Time","Trade_Volume","Trade_Price","Bid_Price", "Bid_Size","Offer_Price", "Offer_Size"]).top(5).toDF())
# output
Symbol Time Trade_Volume Trade_Price Bid_Price Bid_Size \
0 AAPL 1970-01-01 08:00:00.022239 300 27.00 26.9 1
1 AAPL 1970-01-01 08:00:00.022287 500 27.25 26.9 1
2 AAPL 1970-01-01 08:00:00.022317 335 27.26 26.9 1
3 AAPL 1970-01-01 08:00:00.022341 100 27.27 26.9 1
4 AAPL 1970-01-01 08:00:00.022368 31 27.40 26.9 1
Offer_Price Offer_Size
0 27.49 10
1 27.49 10
2 27.49 10
3 27.49 10
4 27.49 10
[5 rows x 8 columns]To calculate trading cost with asof join:
print(trades.merge_asof(quotes, on=["Symbol","Time"]).select("sum(Trade_Volume*abs(Trade_Price-(Bid_Price+Offer_Price)/2))/sum(Trade_Volume*Trade_Price)*10000 as cost").groupby("Symbol").toDF())
# output
Symbol cost
0 AAPL 6.486813
1 FB 35.751041merge_window (window join) is a generalization of asof join. With a window defined by parameters "leftBound" (w1) and "rightBound" (w2), for each row in the left table with the value of the last joining column equal to t, find the rows in the right table with the value of the last joining column between (t+w1) and (t+w2) conditional on all other joining columns are matched, then apply "aggFunctions" to the selected rows in the right table.
The only difference between window join and prevailing window join is that if the right table doesn't contain a matching value for t+w1 (the left boundary of the window), prevailing window join will fill it with the last value before t+w1 (conditional on all other joining columns are matched), and apply "aggFunctions". To use prevailing window join, set prevailing=True.
print(trades.merge_window(quotes, -5000000000, 0, aggFunctions=["avg(Bid_Price)","avg(Offer_Price)"], on=["Symbol","Time"]).where("Time>=07:59:59").top(10).toDF())
# output
Time Exchange Symbol Trade_Volume \
0 1970-01-01 08:00:00.022239 75 AAPL 300
1 1970-01-01 08:00:00.022287 75 AAPL 500
2 1970-01-01 08:00:00.022317 75 AAPL 335
3 1970-01-01 08:00:00.022341 75 AAPL 100
4 1970-01-01 08:00:00.022368 75 AAPL 31
5 1970-01-01 08:00:02.668076 68 AAPL 2434
6 1970-01-01 08:02:20.116025 68 AAPL 66
7 1970-01-01 08:06:31.149930 75 AAPL 100
8 1970-01-01 08:06:32.826399 75 AAPL 100
9 1970-01-01 08:06:33.168833 75 AAPL 74
avg_Bid_Price avg_Offer_Price
0 26.90 27.49
1 26.90 27.49
2 26.90 27.49
3 26.90 27.49
4 26.90 27.49
5 26.75 27.36
6 NaN NaN
7 NaN NaN
8 NaN NaN
9 NaN NaN
[10 rows x 6 columns]To calculate trading cost with window join:
trades.merge_window(quotes,-1000000000, 0, aggFunctions="[wavg(Offer_Price, Offer_Size) as Offer_Price, wavg(Bid_Price, Bid_Size) as Bid_Price]", on=["Symbol","Time"], prevailing=True).select("sum(Trade_Volume*abs(Trade_Price-(Bid_Price+Offer_Price)/2))/sum(Trade_Volume*Trade_Price)*10000 as cost").groupby("Symbol").executeAs("tradingCost")
print(s.loadTable(tableName="tradingCost").toDF())
# output
Symbol cost
0 AAPL 6.367864
1 FB 35.751041Function executeAs saves query result as a table on DolphinDB server.
trade = s.loadTable(dbPath="dfs://valuedb", tableName="trade")
trade.select(['date','bid','ask','prc','vol']).where('TICKER=`AMZN').where('bid!=NULL').where('ask!=NULL').where('vol>10000000').sort('vol desc').executeAs("AMZN")To use the table "AMZN" on DolphinDB server:
t1=s.loadTable(tableName="AMZN")Function ols conducts an OLS regression and returns a dictionary.
trade = s.loadTable(tableName="trade",dbPath="dfs://valuedb")
z=trade.select(['bid','ask','prc']).ols('PRC', ['BID', 'ASK'])
print(z["ANOVA"])
# output
Breakdown DF SS MS F Significance
0 Regression 2 2.689281e+08 1.344640e+08 1.214740e+10 0.0
1 Residual 13133 1.453740e+02 1.106937e-02 NaN NaN
2 Total 13135 2.689282e+08 NaN NaN NaN
print(z["RegressionStat"])
# output
item statistics
0 R2 0.999999
1 AdjustedR2 0.999999
2 StdError 0.105211
3 Observations 13136.000000
print(z["Coefficient"])
# output
factor beta stdError tstat pvalue
0 intercept 0.003710 0.001155 3.213150 0.001316
1 BID 0.605307 0.010517 57.552527 0.000000
2 ASK 0.394712 0.010515 37.537919 0.000000
print(z["Coefficient"].beta[1])
# output
0.6053065014691369For the example below, please note that the ratio operator between 2 integers in DolphinDB is "", which happens to be an escape character in Python. Therefore we need to use VOL\\SHROUT in the select statement.
result = s.loadTable(tableName="US",dbPath="dfs://US").select("select VOL\\SHROUT as turnover, abs(RET) as absRet, (ASK-BID)/(BID+ASK)*2 as spread, log(SHROUT*(BID+ASK)/2) as logMV").where("VOL>0").ols("turnover", ["absRet","logMV", "spread"], True)In this section we give an example of a backtest on a stock momentum strategy. The momentum strategy is one of the best-known quantitative long short equity strategies. It has been studied in numerous academic and sell-side publications since Jegadeesh and Titman (1993). Investors in the momentum strategy believe among individual stocks, past winners will outperform past losers. The most commonly used momentum factor is stocks' past 12 months returns skipping the most recent month. In academic research, the momentum strategy is usually rebalanced once a month and the holding period is also one month. In this example, we rebalance 1/5 of our portfolio positions every day and hold the new tranche for 5 days. For simplicity, transaction costs are not considered.
Create server session
import dolphindb as ddb
s=ddb.session()
s.connect("localhost",8921, "admin", "123456")Step 1: Load data, clean the data, and construct the momentum signal (past 12 months return skipping the most recent month) for each firm. Undefine the table "USstocks" to release the large amount of memory it occupies. Note that executeAs must be used to save the intermediate results on DolphinDB server. Dataset "US" contains US stock price data from 1990 to 2016.
if s.existsDatabase("dfs://US"):
s.dropDatabase("dfs://US")
s.database(dbName='USdb', partitionType=keys.VALUE, partitions=["GFGC","EWST", "EGAS"], dbPath="dfs://US")
US=s.loadTextEx(dbPath="dfs://US", partitionColumns=["TICKER"], tableName='US', remoteFilePath=WORK_DIR + "/USPrices_FIRST.csv")
US = s.loadTable(dbPath="dfs://US", tableName="US")
def loadPriceData(inData):
s.loadTable(inData).select("PERMNO, date, abs(PRC) as PRC, VOL, RET, SHROUT*abs(PRC) as MV").where("weekday(date) between 1:5, isValid(PRC), isValid(VOL)").sort(bys=["PERMNO","date"]).executeAs("USstocks")
s.loadTable("USstocks").select("PERMNO, date, PRC, VOL, RET, MV, cumprod(1+RET) as cumretIndex").contextby("PERMNO").executeAs("USstocks")
return s.loadTable("USstocks").select("PERMNO, date, PRC, VOL, RET, MV, move(cumretIndex,21)/move(cumretIndex,252)-1 as signal").contextby("PERMNO").executeAs("priceData")
priceData = loadPriceData(US.tableName())
# US.tableName() returns the name of the table on the DolphinDB server that corresponds to the table object "US" in Python. Step 2: Generate the portfolios for the momentum strategy.
def genTradeTables(inData):
return s.loadTable(inData).select(["date", "PERMNO", "MV", "signal"]).where("PRC>5, MV>100000, VOL>0, isValid(signal)").sort(bys=["date"]).executeAs("tradables")
def formPortfolio(startDate, endDate, tradables, holdingDays, groups, WtScheme):
holdingDays = str(holdingDays)
groups=str(groups)
ports = tradables.select("date, PERMNO, MV, rank(signal,,"+groups+") as rank, count(PERMNO) as symCount, 0.0 as wt").where("date between "+startDate+":"+endDate).contextby("date").having("count(PERMNO)>=100").executeAs("ports")
if WtScheme == 1:
ports.where("rank=0").contextby("date").update(cols=["wt"], vals=["-1.0/count(PERMNO)/"+holdingDays]).execute()
ports.where("rank="+groups+"-1").contextby("date").update(cols=["wt"], vals=["1.0/count(PERMNO)/"+holdingDays]).execute()
elif WtScheme == 2:
ports.where("rank=0").contextby("date").update(cols=["wt"], vals=["-MV/sum(MV)/"+holdingDays]).execute()
ports.where("rank="+groups+"-1").contextby("date").update(cols=["wt"], vals=["MV/sum(MV)/"+holdingDays]).execute()
else:
raise Exception("Invalid WtScheme. valid values:1 or 2")
return ports.select("PERMNO, date as tranche, wt").where("wt!=0").sort(bys=["PERMNO","date"]).executeAs("ports")
tradables=genTradeTables(priceData.tableName())
startDate="1996.01.01"
endDate="2017.01.01"
holdingDays=5
groups=10
ports=formPortfolio(startDate=startDate,endDate=endDate,tradables=tradables,holdingDays=holdingDays,groups=groups,WtScheme=2)
dailyRtn=priceData.select("date, PERMNO, RET as dailyRet").where("date between "+startDate+":"+endDate).executeAs("dailyRtn")Step 3: Calculate the profit/loss for each stock in the portfolio in each of the days in the holding period. Close the positions at the end of the holding period.
def calcStockPnL(ports, dailyRtn, holdingDays, endDate):
s.table(data={'age': list(range(1,holdingDays+1))}).executeAs("ages")
ports.select("tranche").sort("tranche").executeAs("dates")
s.run("dates = sort distinct dates.tranche")
s.run("dictDateIndex=dict(dates,1..dates.size())")
s.run("dictIndexDate=dict(1..dates.size(), dates)")
ports.merge_cross(s.table(data="ages")).select("dictIndexDate[dictDateIndex[tranche]+age] as date, PERMNO, tranche, age, take(0.0,age.size()) as ret, wt as expr, take(0.0,age.size()) as pnl").where("isValid(dictIndexDate[dictDateIndex[tranche]+age]), dictIndexDate[dictDateIndex[tranche]+age]<=min(lastDays[PERMNO], "+endDate+")").executeAs("pos")
t1= s.loadTable("pos")
t1.merge(dailyRtn, on=["date","PERMNO"], merge_for_update=True).update(["ret"],["dailyRet"]).execute()
t1.contextby(["PERMNO","tranche"]).update(["expr"], ["expr*cumprod(1+ret)"]).execute()
t1.update(["pnl"],["expr*ret/(1+ret)"]).execute()
return t1
lastDaysTable = priceData.select("max(date) as date").groupby("PERMNO").executeAs("lastDaysTable")
s.run("lastDays=dict(lastDaysTable.PERMNO,lastDaysTable.date)")
# undefine priceData to release memory
s.undef(priceData.tableName(), 'VAR')
stockPnL = chuzhaoalcStockPnL(ports=ports, dailyRtn=dailyRtn, holdingDays=holdingDays, endDate=endDate)Step 4: Calculate portfolio profit/loss
portPnl = stockPnL.select("pnl").groupby("date").sum().sort(bys=["date"]).executeAs("portPnl")
print(portPnl.toDF())The example below shows how to calculate factor No. 98 in "101 Formulaic Alphas" by Kakushadze (2015) with daily data of US stocks.
def alpha98(t):
t1 = s.table(data=t)
# add two calcualted columns through function update
t1.contextby(["date"]).update(cols=["rank_open","rank_adv15"], vals=["rank(open)","rank(adv15)"]).execute()
# add two more calculated columns
t1.contextby(["PERMNO"]).update(["decay7", "decay8"], ["mavg(mcorr(vwap, msum(adv5, 26), 5), 1..7)","mavg(mrank(9 - mimin(mcorr(rank_open, rank_adv15, 21), 9), true, 7), 1..8)"]).execute()
# return the final results with three columns: PERMNO, date, and A98
return t1.select("PERMNO, date, rank(decay7)-rank(decay8) as A98").contextby(["date"]).executeAs("alpha98")
US = s.loadTable(tableName="US", dbPath="dfs://US").select("PERMNO, date, PRC as vwap, PRC+rand(1.0, PRC.size()) as open, mavg(VOL, 5) as adv5, mavg(VOL,15) as adv15").where("2007.01.01<=date<=2016.12.31").contextby("PERMNO").executeAs("US")
result=alpha98(US.tableName()).where('date>2007.03.12').executeAs("result")
print(result.top(10).toDF())