forked from mementum/backtrader
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathanalyzer.py
More file actions
446 lines (330 loc) · 14.1 KB
/
analyzer.py
File metadata and controls
446 lines (330 loc) · 14.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
#!/usr/bin/env python
# -*- coding: utf-8; py-indent-offset:4 -*-
###############################################################################
#
# Copyright (C) 2015-2020 Daniel Rodriguez
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import calendar
from collections import OrderedDict
import datetime
import pprint as pp
import backtrader as bt
from backtrader import TimeFrame
from backtrader.utils.py3 import MAXINT, with_metaclass
class MetaAnalyzer(bt.MetaParams):
def donew(cls, *args, **kwargs):
'''
Intercept the strategy parameter
'''
# Create the object and set the params in place
_obj, args, kwargs = super(MetaAnalyzer, cls).donew(*args, **kwargs)
_obj._children = list()
_obj.strategy = strategy = bt.metabase.findowner(_obj, bt.Strategy)
_obj._parent = bt.metabase.findowner(_obj, Analyzer)
# Register with a master observer if created inside one
masterobs = bt.metabase.findowner(_obj, bt.Observer)
if masterobs is not None:
masterobs._register_analyzer(_obj)
_obj.datas = strategy.datas
# For each data add aliases: for first data: data and data0
if _obj.datas:
_obj.data = data = _obj.datas[0]
for l, line in enumerate(data.lines):
linealias = data._getlinealias(l)
if linealias:
setattr(_obj, 'data_%s' % linealias, line)
setattr(_obj, 'data_%d' % l, line)
for d, data in enumerate(_obj.datas):
setattr(_obj, 'data%d' % d, data)
for l, line in enumerate(data.lines):
linealias = data._getlinealias(l)
if linealias:
setattr(_obj, 'data%d_%s' % (d, linealias), line)
setattr(_obj, 'data%d_%d' % (d, l), line)
_obj.create_analysis()
# Return to the normal chain
return _obj, args, kwargs
def dopostinit(cls, _obj, *args, **kwargs):
_obj, args, kwargs = \
super(MetaAnalyzer, cls).dopostinit(_obj, *args, **kwargs)
if _obj._parent is not None:
_obj._parent._register(_obj)
# Return to the normal chain
return _obj, args, kwargs
class Analyzer(with_metaclass(MetaAnalyzer, object)):
'''Analyzer base class. All analyzers are subclass of this one
An Analyzer instance operates in the frame of a strategy and provides an
analysis for that strategy.
Automagically set member attributes:
- ``self.strategy`` (giving access to the *strategy* and anything
accessible from it)
- ``self.datas[x]`` giving access to the array of data feeds present in
the the system, which could also be accessed via the strategy reference
- ``self.data``, giving access to ``self.datas[0]``
- ``self.dataX`` -> ``self.datas[X]``
- ``self.dataX_Y`` -> ``self.datas[X].lines[Y]``
- ``self.dataX_name`` -> ``self.datas[X].name``
- ``self.data_name`` -> ``self.datas[0].name``
- ``self.data_Y`` -> ``self.datas[0].lines[Y]``
This is not a *Lines* object, but the methods and operation follow the same
design
- ``__init__`` during instantiation and initial setup
- ``start`` / ``stop`` to signal the begin and end of operations
- ``prenext`` / ``nextstart`` / ``next`` family of methods that follow
the calls made to the same methods in the strategy
- ``notify_trade`` / ``notify_order`` / ``notify_cashvalue`` /
``notify_fund`` which receive the same notifications as the equivalent
methods of the strategy
The mode of operation is open and no pattern is preferred. As such the
analysis can be generated with the ``next`` calls, at the end of operations
during ``stop`` and even with a single method like ``notify_trade``
The important thing is to override ``get_analysis`` to return a *dict-like*
object containing the results of the analysis (the actual format is
implementation dependent)
'''
csv = True
def __len__(self):
'''Support for invoking ``len`` on analyzers by actually returning the
current length of the strategy the analyzer operates on'''
return len(self.strategy)
def _register(self, child):
self._children.append(child)
def _prenext(self):
for child in self._children:
child._prenext()
self.prenext()
def _notify_cashvalue(self, cash, value):
for child in self._children:
child._notify_cashvalue(cash, value)
self.notify_cashvalue(cash, value)
def _notify_fund(self, cash, value, fundvalue, shares):
for child in self._children:
child._notify_fund(cash, value, fundvalue, shares)
self.notify_fund(cash, value, fundvalue, shares)
def _notify_trade(self, trade):
for child in self._children:
child._notify_trade(trade)
self.notify_trade(trade)
def _notify_order(self, order):
for child in self._children:
child._notify_order(order)
self.notify_order(order)
def _nextstart(self):
for child in self._children:
child._nextstart()
self.nextstart()
def _next(self):
for child in self._children:
child._next()
self.next()
def _start(self):
for child in self._children:
child._start()
self.start()
def _stop(self):
for child in self._children:
child._stop()
self.stop()
def notify_cashvalue(self, cash, value):
'''Receives the cash/value notification before each next cycle'''
pass
def notify_fund(self, cash, value, fundvalue, shares):
'''Receives the current cash, value, fundvalue and fund shares'''
pass
def notify_order(self, order):
'''Receives order notifications before each next cycle'''
pass
def notify_trade(self, trade):
'''Receives trade notifications before each next cycle'''
pass
def next(self):
'''Invoked for each next invocation of the strategy, once the minum
preiod of the strategy has been reached'''
pass
def prenext(self):
'''Invoked for each prenext invocation of the strategy, until the minimum
period of the strategy has been reached
The default behavior for an analyzer is to invoke ``next``
'''
self.next()
def nextstart(self):
'''Invoked exactly once for the nextstart invocation of the strategy,
when the minimum period has been first reached
'''
self.next()
def start(self):
'''Invoked to indicate the start of operations, giving the analyzer
time to setup up needed things'''
pass
def stop(self):
'''Invoked to indicate the end of operations, giving the analyzer
time to shut down needed things'''
pass
def create_analysis(self):
'''Meant to be overriden by subclasses. Gives a chance to create the
structures that hold the analysis.
The default behaviour is to create a ``OrderedDict`` named ``rets``
'''
self.rets = OrderedDict()
def get_analysis(self):
'''Returns a *dict-like* object with the results of the analysis
The keys and format of analysis results in the dictionary is
implementation dependent.
It is not even enforced that the result is a *dict-like object*, just
the convention
The default implementation returns the default OrderedDict ``rets``
created by the default ``create_analysis`` method
'''
return self.rets
def print(self, *args, **kwargs):
'''Prints the results returned by ``get_analysis`` via a standard
``Writerfile`` object, which defaults to writing things to standard
output
'''
writer = bt.WriterFile(*args, **kwargs)
writer.start()
pdct = dict()
pdct[self.__class__.__name__] = self.get_analysis()
writer.writedict(pdct)
writer.stop()
def pprint(self, *args, **kwargs):
'''Prints the results returned by ``get_analysis`` using the pretty
print Python module (*pprint*)
'''
pp.pprint(self.get_analysis(), *args, **kwargs)
class MetaTimeFrameAnalyzerBase(Analyzer.__class__):
def __new__(meta, name, bases, dct):
# Hack to support original method name
if '_on_dt_over' in dct:
dct['on_dt_over'] = dct.pop('_on_dt_over') # rename method
return super(MetaTimeFrameAnalyzerBase, meta).__new__(meta, name,
bases, dct)
class TimeFrameAnalyzerBase(with_metaclass(MetaTimeFrameAnalyzerBase,
Analyzer)):
params = (
('timeframe', None),
('compression', None),
('_doprenext', True),
)
def _start(self):
# Override to add specific attributes
self.timeframe = self.p.timeframe or self.data._timeframe
self.compression = self.p.compression or self.data._compression
self.dtcmp, self.dtkey = self._get_dt_cmpkey(datetime.datetime.min)
super(TimeFrameAnalyzerBase, self)._start()
def _prenext(self):
for child in self._children:
child._prenext()
if self._dt_over():
self.on_dt_over()
if self.p._doprenext:
self.prenext()
def _nextstart(self):
for child in self._children:
child._nextstart()
if self._dt_over() or not self.p._doprenext: # exec if no prenext
self.on_dt_over()
self.nextstart()
def _next(self):
for child in self._children:
child._next()
if self._dt_over():
self.on_dt_over()
self.next()
def on_dt_over(self):
pass
def _dt_over(self):
if self.timeframe == TimeFrame.NoTimeFrame:
dtcmp, dtkey = MAXINT, datetime.datetime.max
else:
# With >= 1.9.x the system datetime is in the strategy
dt = self.strategy.datetime.datetime()
dtcmp, dtkey = self._get_dt_cmpkey(dt)
if self.dtcmp is None or dtcmp > self.dtcmp:
self.dtkey, self.dtkey1 = dtkey, self.dtkey
self.dtcmp, self.dtcmp1 = dtcmp, self.dtcmp
return True
return False
def _get_dt_cmpkey(self, dt):
if self.timeframe == TimeFrame.NoTimeFrame:
return None, None
if self.timeframe == TimeFrame.Years:
dtcmp = dt.year
dtkey = datetime.date(dt.year, 12, 31)
elif self.timeframe == TimeFrame.Months:
dtcmp = dt.year * 100 + dt.month
_, lastday = calendar.monthrange(dt.year, dt.month)
dtkey = datetime.datetime(dt.year, dt.month, lastday)
elif self.timeframe == TimeFrame.Weeks:
isoyear, isoweek, isoweekday = dt.isocalendar()
dtcmp = isoyear * 100 + isoweek
sunday = dt + datetime.timedelta(days=7 - isoweekday)
dtkey = datetime.datetime(sunday.year, sunday.month, sunday.day)
elif self.timeframe == TimeFrame.Days:
dtcmp = dt.year * 10000 + dt.month * 100 + dt.day
dtkey = datetime.datetime(dt.year, dt.month, dt.day)
else:
dtcmp, dtkey = self._get_subday_cmpkey(dt)
return dtcmp, dtkey
def _get_subday_cmpkey(self, dt):
# Calculate intraday position
point = dt.hour * 60 + dt.minute
if self.timeframe < TimeFrame.Minutes:
point = point * 60 + dt.second
if self.timeframe < TimeFrame.Seconds:
point = point * 1e6 + dt.microsecond
# Apply compression to update point position (comp 5 -> 200 // 5)
point = point // self.compression
# Move to next boundary
point += 1
# Restore point to the timeframe units by de-applying compression
point *= self.compression
# Get hours, minutes, seconds and microseconds
if self.timeframe == TimeFrame.Minutes:
ph, pm = divmod(point, 60)
ps = 0
pus = 0
elif self.timeframe == TimeFrame.Seconds:
ph, pm = divmod(point, 60 * 60)
pm, ps = divmod(pm, 60)
pus = 0
elif self.timeframe == TimeFrame.MicroSeconds:
ph, pm = divmod(point, 60 * 60 * 1e6)
pm, psec = divmod(pm, 60 * 1e6)
ps, pus = divmod(psec, 1e6)
extradays = 0
if ph > 23: # went over midnight:
extradays = ph // 24
ph %= 24
# moving 1 minor unit to the left to be in the boundary
# pm -= self.timeframe == TimeFrame.Minutes
# ps -= self.timeframe == TimeFrame.Seconds
# pus -= self.timeframe == TimeFrame.MicroSeconds
tadjust = datetime.timedelta(
minutes=self.timeframe == TimeFrame.Minutes,
seconds=self.timeframe == TimeFrame.Seconds,
microseconds=self.timeframe == TimeFrame.MicroSeconds)
# Add extra day if present
if extradays:
dt += datetime.timedelta(days=extradays)
# Replace intraday parts with the calculated ones and update it
dtcmp = dt.replace(hour=ph, minute=pm, second=ps, microsecond=pus)
dtcmp -= tadjust
dtkey = dtcmp
return dtcmp, dtkey