An event-driven trend trading framework for A-share quantitative research. FeatherTrade is lightweight, modular, and built for both researchers and developers. Supports strategy development, backtesting, and a factor management pipeline for building and persisting financial factors from external data sources.
- Event-driven core — all components communicate via a typed event queue; same interfaces work in backtest and live modes
- Strategy framework — implement
on_bar/on_tick, emit signals, plug in risk rules - Backtest simulator — replays bar data, fills orders at close price, tracks cash and positions
- Risk management — pluggable stop-loss / take-profit rules evaluated on every market event
- Factor system — fetch, compute, and persist financial factors to SQLite; extensible via drop-in Python files
- TuShare integration — built-in
TuShareDataSourcefor A-share daily bars and daily_basic indicators - Quick register — register a full set of factors from a DataFrame API in one config block
- Python 3.10+
pip install tushare apscheduler pyyaml pandas
Copy the workspace config template and fill in your TuShare token:
cp workspace/config/factors.yaml.example workspace/config/factors.yaml
# edit workspace/config/factors.yaml: set tushare_tokenOr set via environment variable:
export TUSHARE_TOKEN=your_token_herepython run_backtest.py
python run_risk_backtest.py# 1. Pull data into the factor DB first
python -m src.trading_system.factors.extensions.tushare_daily_task
# 2. Run the factor-powered backtest
python run_factor_backtest.py| Document | Description |
|---|---|
| Strategy Guide | How to write strategies and connect them to the backtest engine |
| Factor System Overview | Factor pipeline architecture: fetch → compute → store |
| Factor Quick Register | Batch-register factors from a data source in one config block |
| TuShare Data Source | Using TuShareDataSource for A-share data ingestion |
| Factor Extension Examples | Step-by-step examples for registering data sources and factors |
| Extensibility & Scheduling | Extension pattern and scheduled ingestion tasks |
| Extension Guide | How to extend strategies, adapters, and factor components |
| Architecture | System architecture and event flow |
| Goals | Design goals and non-goals |
CSVDataFeed / FactorDataFeed
→ MarketEvent
→ StrategyManager → Strategy.on_bar()
→ SignalEvent
→ BacktestSimulator / ExecutionEngine
→ FillEvent
→ AccountService (cash & positions)
→ RiskManager → RiskSignalEvent (stop-loss / take-profit)
src/trading_system/
├── core/ # EventEngine, StrategyManager, BacktestRunner
├── data/ # Bar, Tick domain models
├── modules/ # CSVDataFeed, FactorDataFeed, BacktestSimulator,
│ # AccountService, ExecutionEngine
├── strategies/ # BaseStrategy, StatefulStrategy, built-in strategies
├── risk/ # RiskManager, BaseRiskStrategy, FixedStopLoss/TakeProfit
└── factors/ # Factor pipeline
├── builtin/ # FileDataSource, APIDataSource, TuShareDataSource
│ # MovingAverageFactor, ColumnExtractLogic
├── extensions/ # Drop user extensions here (auto-discovered)
└── ... # FactorService, FactorRegistry, TaskScheduler, FactorDatabase
Subclass BaseStrategy (or StatefulStrategy for state-machine / stop-loss support):
from src.trading_system.strategies.base_strategy import BaseStrategy
from src.trading_system.data.market_data import Bar, Tick
from src.trading_system.strategies.context import StrategyContext
class MyStrategy(BaseStrategy):
def on_bar(self, bar: Bar, context: StrategyContext):
# Access account state
cash = context.account.cash
# Emit a buy signal (100 shares at market price)
self.send_signal(bar.symbol, "BUY", 100, bar.close)
def on_tick(self, tick: Tick, context: StrategyContext):
pass # tick-level logic (optional)See Strategy Guide for the full StatefulStrategy API including ATR stop-loss and position sizing.
from src.trading_system.core.event_engine import EventEngine
from src.trading_system.core.strategy_manager import StrategyManager
from src.trading_system.modules.backtest_simulator import BacktestSimulator
from src.trading_system.modules.account_service import AccountService
from src.trading_system.modules.csv_data_feed import CSVDataFeed
engine = EventEngine()
account_service = AccountService(engine, initial_cash=500_000)
simulator = BacktestSimulator(engine, account_service)
manager = StrategyManager(engine, simulator, account_service=account_service)
strategy = MyStrategy(engine, {"fast_period": 10, "slow_period": 30})
manager.add_strategy(strategy)
feed = CSVDataFeed(engine, symbol="000001.SZ", file_path="data/000001.csv")
engine.start()
feed.start()
# ... wait for feed to finish ...
feed.stop()
engine.stop()To use factor DB data instead of CSV, replace CSVDataFeed with FactorDataFeed — see run_factor_backtest.py.
Edit the SYMBOLS variable in the extension file, then run:
# A-share daily bars (open, high, low, close, vol, ...)
python -m src.trading_system.factors.extensions.tushare_daily_task
# Daily basic indicators (PE, PB, turnover rate, market cap, ...)
python -m src.trading_system.factors.extensions.tushare_basic_taskfrom src.trading_system.factors.database import FactorDatabase
from src.trading_system.factors.service import FactorService
from src.trading_system.factors.settings import DB_PATH
from datetime import datetime
db = FactorDatabase(DB_PATH)
svc = FactorService(db)
values = svc.get_factor_values(
"000001.SZ", "daily_close",
start_time=datetime(2025, 1, 1),
end_time=datetime(2026, 1, 1),
)
for v in values:
print(v.timestamp, v.value)Drop a .py file in src/trading_system/factors/extensions/ with a class inheriting BaseFactorLogic:
import pandas as pd
from typing import Dict, Any
from src.trading_system.factors.base import BaseFactorLogic
class MyFactor(BaseFactorLogic):
def compute(self, data: pd.DataFrame, config: Dict[str, Any]) -> pd.DataFrame:
window = config.get("window", 20)
return pd.DataFrame({
"timestamp": data["timestamp"],
"symbol": data["symbol"],
"value": data["close"].rolling(window).mean(),
})It is auto-discovered at FactorService init — no registration code needed.
from src.trading_system.factors.base import BaseDataSource
import pandas as pd
from datetime import datetime
class MyDataSource(BaseDataSource):
def fetch_data(self, symbol: str, start_time: datetime, end_time: datetime) -> pd.DataFrame:
# return DataFrame with at least: timestamp/trade_date, symbol/ts_code, and value columns
...from src.trading_system.factors.config import DataSourceConfig
from src.trading_system.factors.quick_register import QuickRegisterConfig
DATA_SOURCE_CONFIGS = [
DataSourceConfig(
name="my_source",
source_class="MyDataSource",
params={"api": "my_api", "symbols": ["600519.SH"]},
time_range={"start": "today-365d", "end": "today"},
)
]
QUICK_REGISTER_CONFIGS = [
QuickRegisterConfig(
data_source="my_source",
fields=["open", "close", "pe"],
prefix="my_",
category="MyCategory",
)
]See Extensibility & Scheduling for scheduled ingestion (cron / one-off tasks via APScheduler).
from src.trading_system.risk.base_risk import BaseRiskStrategy
from src.trading_system.modules.execution_engine import Position
class MyRiskRule(BaseRiskStrategy):
def evaluate(self, position: Position, current_price: float):
threshold = self._params.get("threshold", 0.05)
if position.quantity > 0:
loss_pct = (position.average_cost - current_price) / position.average_cost
if loss_pct >= threshold:
return "SELL_ALL"
return None
# Attach to RiskManager:
risk_manager.add_strategy(MyRiskRule({"threshold": 0.05}))python -m pytest tests/
python -m pytest tests/unit/
python -m pytest tests/integration/| Strategy | Description |
|---|---|
MovingAverageCrossover |
Simple MA crossover (fast/slow) |
AtrTrendStrategy |
ATR-based trend following with volatility stop |
DualMATrendStrategy |
Dual MA + 60-day trend filter + ATR stop, A-share sizing |
MIT