| Bloomberg-style iBoard | Dense dark terminal board — 27 columns, 9 segment tabs, real-time flash updates |
| TradingView Charts | Candlestick + volume histogram via lightweight-charts v4 with MA overlays |
| Market Breadth | Live advances / declines for VNINDEX, VN30, HNXIndex, HNX30 |
| Low-latency Pipeline | SSI WebSocket → Kafka KRaft → MySQL streaming tables → FastAPI → WebSocket |
| OHLCV Aggregation | 1m/1d candlesticks computed in real-time by CandlestickConsumer |
| Star-schema DW | Spark ETL → warehouse.dim.* + warehouse.fact.* for historical queries |
| JWT Authentication | Secure watchlists, per-user watchlists persisted in api DB |
| WebSocket Live Feed | Per-symbol and market-wide push updates with 3s auto-reconnect |
| VI / EN Language Switcher | Full i18n via react-i18next — language persisted to localStorage |
git clone https://github.com/bazzi24/streamflow.git
cd streamflow
cp .env.example .envFill in .env with your SSI credentials and secrets:
consumerID=<your_ssi_consumer_id>
consumerSecret=<your_ssi_consumer_secret>
url=https://fc-data.ssi.com.vn/
stream_url=https://fc-datahub.ssi.com.vn/
# REQUIRED — generate with: python -c "import secrets; print(secrets.token_hex(32))"
SECRET_KEY=<your-secret-key>
# Required credentials — use root for dev, create a limited app user in production
DB_USER=root
DB_PASSWORD=<your_password>
# CORS — comma-separated, no spaces
CORS_ORIGINS=http://localhost:3000uv venv
source .venv/bin/activate
uv syncmkdir -p lib
curl -L -o lib/mysql-connector-j-8.0.33.jar \
"https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar"docker compose -f docker/docker-compose.yaml up -dWait for services to become healthy:
docker compose -f docker/docker-compose.yaml ps
# mysql healthy
# kafka running
# api running
# frontend runningOpen http://localhost:3000 — the Bloomberg-style board is live.
First time? Start the Kafka producers to begin ingesting market data:
docker compose -f docker/docker-compose.yaml run --rm producer-trade # X-TRADE:ALL docker compose -f docker/docker-compose.yaml run --rm producer-quote # X-QUOTE:ALL docker compose -f docker/docker-compose.yaml run --rm producer-index # MI:ALL docker compose -f docker/docker-compose.yaml run --rm producer-foreign # R:ALL docker compose -f docker/docker-compose.yaml run --rm producer-status # F:ALL
| Layer | Technology |
|---|---|
| Data source | SSI Securities WebSocket API |
| Message broker | Apache Kafka 3.x (KRaft, no ZooKeeper) |
| Raw storage | MySQL 8.0.39 (data DB) |
| DW storage | MySQL 8.0.39 (warehouse DB, star schema) |
| ETL | Apache Spark 3.5 (PySpark) |
| API | FastAPI + Python 3.12 + aiokafka |
| Frontend | React 18 + Vite + TypeScript + Tailwind CSS |
| Charts | TradingView lightweight-charts v4 |
| i18n | react-i18next (VI + EN, persisted to localStorage) |
| Containers | Docker Compose |
| Service | Description | Port |
|---|---|---|
kafka |
Kafka KRaft broker | 9092 |
kafka-ui |
Kafka UI dashboard | 8080 |
mysql |
MySQL 8.0.39 (data + warehouse DBs) |
3306 |
spark-master |
Apache Spark master | 7077, 8082 |
spark-worker |
Spark worker (2 cores, 4 GB) | — |
api |
FastAPI REST + WebSocket | 8000 |
frontend |
React SPA served via Nginx | 80→3000 |
producer |
All 5 Kafka producers (SSI channels) | — |
consumer |
All 6 Kafka consumers (5 topics + OHLCV) | — |
Base: http://localhost:8000/api/v1
| Endpoint | Method | Auth | Description |
|---|---|---|---|
/auth/register |
POST | — | Register user |
/auth/login |
POST | — | Login → JWT (24h) |
/users/me |
GET | JWT | Current user |
/users/me/watchlist |
GET/PUT | JWT | Watchlist CRUD |
/stocks |
GET | — | All symbols + latest prices |
/stocks/{symbol} |
GET | — | Symbol metadata |
/stocks/{symbol}/quote |
GET | — | Live bid/ask from data DB |
/stocks/{symbol}/orderbook |
GET | — | Top 3 bid/ask levels |
/stocks/{symbol}/ohlcv |
GET | — | Intraday OHLCV (?interval=1m) |
/stocks/{symbol}/history |
GET | — | Daily OHLCV (?days=30) |
/market/overview |
GET | — | Indices + top gainers/losers |
/health |
GET | — | Health check |
| Endpoint | Auth | Description |
|---|---|---|
/ws/stocks/{symbol} |
Optional | Per-symbol live updates |
/ws/market |
Optional | Market-wide updates |
Message types: price_update · orderbook_update · index_update · candlestick_update
Two MySQL databases — data (raw + reference + candlestick) and warehouse (star-schema DW).
| Table | Source / Writer | PK + Index |
|---|---|---|
data_trade |
market_data_trade topic |
PRIMARY KEY (id), INDEX(symbol, trading_date) |
data_quote |
market_data_quote topic |
PRIMARY KEY (id), INDEX(symbol_id, trading_date) |
index_data |
index_data topic |
PRIMARY KEY (id), INDEX(index_id, trading_date) |
foreign_room |
foreign_room_data topic |
PRIMARY KEY (id), INDEX(symbol, trading_date) |
securities_status |
securities_status topic |
PRIMARY KEY (id), INDEX(symbol_id, trading_date) |
candlestick_1m |
CandlestickConsumer | PRIMARY KEY (symbol, time_start) |
candlestick_1d |
CandlestickConsumer | PRIMARY KEY (symbol, trading_date) |
candlestick_1mis the source of truth for 1m OHLCV; larger timeframes are derived at query time.
| Table | Key / Composite PK columns |
|---|---|
date |
tradingdate_key (date needs backticks) |
time |
time_key (time needs backticks) |
symbol |
symbol_key |
market_index |
index_key |
exchange |
exchange_key |
tradingsession |
trading_session_key |
stockorderbook |
tradingdate_key, time_key, symbol_key, exchange_key, session_key |
stocktrade |
same as stockorderbook |
marketindex |
same as above |
All secrets are loaded from .env via env_file in docker-compose. The FastAPI service requires the variables below.
| Variable | Example | Purpose |
|---|---|---|
SECRET_KEY |
b7649d31d... (hex 64-char token) |
JWT signing — never commit a default |
DB_USER |
root (dev) / streamflow_app (prod) |
MySQL username |
DB_PASSWORD |
— | MySQL password |
Generate a secure SECRET_KEY:
python -c "import secrets; print(secrets.token_hex(32))"| Variable | Default | Purpose |
|---|---|---|
CORS_ORIGINS |
http://localhost:3000 |
Comma-separated allowed origins |
consumerID |
— | SSI API consumer ID |
consumerSecret |
— | SSI API consumer secret |
url |
https://fc-data.ssi.com.vn/ |
SSI REST API base |
stream_url |
https://fc-datahub.ssi.com.vn/ |
SSI WebSocket base |
RAW_DB_URL |
jdbc:mysql://mysql:3306/data?... |
Spark reads data DB |
DW_DB_URL |
jdbc:mysql://mysql:3306/warehouse?... |
Spark writes warehouse DB |
KAFKA_BOOTSTRAP_SERVERS |
kafka:9092 |
All Kafka clients |
SPARK_MASTER_URL |
spark://spark-master:7077 |
spark-submit target |
MYSQL_JAR |
/streamflow/lib/mysql-connector-j-8.0.33.jar |
JDBC driver path |
Run this in MySQL to create a read-write app user (replace the password):
-- In docker exec mysql, or via any mysql client:
CREATE USER IF NOT EXISTS 'streamflow_app'@'%' IDENTIFIED BY '<strong-password>';
GRANT SELECT, INSERT, UPDATE, DELETE ON `data`.* TO 'streamflow_app'@'%';
GRANT SELECT, INSERT, UPDATE, DELETE ON `warehouse`.* TO 'streamflow_app'@'%';
FLUSH PRIVILEGES;
-- Then update .env:
-- DB_USER=streamflow_app
-- DB_PASSWORD=<strong-password>docker compose -f docker/docker-compose.yaml down # stop containers (data persists)
docker compose -f docker/docker-compose.yaml down -v # stop + remove volumes (data loss)MIT License — bazzi24

