This project is designed to fully comply with the requirements in the Option B assignment. Below is a mapping of assignment requirements to implemented features:
- Loads all 8 source files from the Czech Financial Dataset
- Decodes
birth_numberintobirth_dateandgender(seestage1_bronze.py) - Renames district columns from
A1-A16to descriptive English names (seeconfig.py) - Translates Czech-language codes in
transto English equivalents - Parses all date fields into proper date types
- Handles NULL values explicitly (see Null Strategy section)
- Outputs clean, typed datasets as Delta tables, Parquet, and CSV
- Builds an account-centric entity model by joining account, owner client, district, loan, and card
- Derives fields:
account_age_months,owner_age_at_account_creation,has_loan,has_card,loan_status_category - Normalizes column names to snake_case
- Outputs unified silver-layer entity table and cleaned transaction fact table as Delta, Parquet, and CSV
- Produces two output tables:
- Account Monthly Profile: total credits, debits, net cash flow, transaction count, EOM balance, card withdrawals, loan payment flag, gender, age bucket, region
- Regional Segment Summary: median net cash flow, avg transaction count, active accounts, avg salary, loan default rate, gold card holders
- Outputs both tables as Delta, Parquet, and CSV
- Handles optional columns (
trans.channel,district.A17) gracefully; backward compatible - If present, columns flow through to silver and gold layers
- Design is documented in code and README
- Python 3.10+
- PySpark + Delta Lake (processing)
- Output format: Parquet, CSV, Delta tables
- Runs locally, no cloud services required
- No notebooks; pipeline is fully script-based
- Pipeline code organized by stage
- README.md with setup, run instructions, design decisions, schema evolution, improvement ideas
- Sample output: CSV and Parquet for each gold-layer table (first 100 rows in /output/samples)
- Unit tests for each stage
- Pipeline architecture: clear stage separation, logical flow
- Data quality handling: nulls, type casting, code translation, encoded field decoding
- Code organization: modular, readable, clear responsibilities
- Schema evolution: backward compatible, handles new/missing columns
- Aggregation correctness: accurate rollups, edge cases handled
- Documentation: clear README, code comments
- Testing: unit tests included
- Performance: Spark/Delta Lake for scalable processing
This is a PySpark + Delta Lake Medallion-architecture pipeline that ingests the Czech Financial (Berka) dataset, cleans and normalizes it through Bronze → Silver → Gold layers, and produces aggregated customer-level feature tables ready for analytics.
- Python 3.10+
python -m venv .venv
# Windows
.venv\Scripts\activate
# macOS / Linux
source .venv/bin/activate
pip install -r requirements.txt- Download the Czech Financial Dataset from Kaggle.
- Place the 8 CSV files into the
data/directory:
data/
├── account.csv
├── client.csv
├── disp.csv
├── trans.csv
├── order.csv
├── loan.csv
├── card.csv
└── district.csv
python -m pipeline.runOptional flags:
python -m pipeline.run --data-dir path/to/csv/files --log-level DEBUGpytest tests/ -v --tb=shortWith coverage:
pytest tests/ -v --cov=pipeline --cov-report=term-missingdata/*.csv (Raw source files)
│
▼
┌──────────────────────────────────────────────────┐
│ Stage 1 — Bronze (Ingest & Clean) │
│ • Load 8 semicolon-delimited CSVs │
│ • Validate required columns per table │
│ • Decode birth_number → birth_date + gender │
│ • Rename district A1–A16 columns │
│ • Translate Czech codes → English │
│ • Parse YYMMDD dates (century pivot at 50) │
│ • Warn on high null rates and anomalous amounts │
└──────────────────────────────────────────────────┘
│ output/bronze/*.parquet
▼
┌──────────────────────────────────────────────────┐
│ Stage 2 — Silver (Enrich & Normalize) │
│ • Account-centric entity model │
│ • Join: account → owner → district │
│ → loan (optional, deduped) │
│ → card (optional) │
│ • Derive: account_age_months, owner_age, │
│ has_loan, has_card, │
│ loan_status_category (good/bad/none) │
│ • Assert entity uniqueness (one row/account) │
└──────────────────────────────────────────────────┘
│ output/silver/*.parquet
▼
┌──────────────────────────────────────────────────┐
│ Stage 3 — Gold (Aggregate) │
│ • Table A: Account Monthly Profile │
│ • Table B: Regional Segment Summary │
└──────────────────────────────────────────────────┘
│ output/gold/*.parquet
│ output/samples/*.csv (first 100 rows each)
This pipeline uses PySpark and Delta Lake for scalable, distributed processing and ACID-compliant storage.
Rationale:
- Scalability — PySpark handles large datasets efficiently, and Delta Lake provides robust storage with schema enforcement and time travel.
- Compatibility — The pipeline is ready for big data workloads and can be run locally or in a cluster.
- Data Lakehouse — Delta Lake enables transactional updates and schema evolution, supporting modern analytics workflows.
Trade-off: For small datasets, Pandas would be simpler, but this pipeline is designed for production-scale and future extensibility.
All magic values — paths, translation maps, required-column contracts,
data-quality thresholds, age-bucket parameters — live in pipeline/config.py.
No literals are scattered across stage files.
- Preserve nulls through Bronze/Silver rather than filling with sentinel values. This keeps the data honest for downstream analytics.
- Empty strings in
transcolumns (operation,k_symbol,bank,account) are explicitly converted toNaNin Bronze so there is a single null representation throughout. - District columns with known missing values (district 69, Jesenik:
unemployment_rate_95andcrimes_95) are left asNaN.
The Silver entity table is account-centric (one row per account) with the owner's demographics, district attributes, and optional loan/card info joined in. A post-build assertion enforces uniqueness. This makes Gold-layer aggregation straightforward — join the entity once and group.
The Berka dataset has one loan per account, but the pipeline defensively
deduplicates: if multiple loans exist, only the most-recent (by loan_date)
is kept, preventing fan-out on the entity join.
- Accounts with no transactions in a month simply do not appear in the monthly profile (sparse representation).
eom_balanceis the balance from the last transaction of the month, ordered by(trans_date, trans_id)to break ties deterministically.- Loan default rate is
NaNwhen a region has no accounts with loans, distinguishing "no loan data" from a genuine 0% default rate.
Two simulated V2 changes are handled without modifying core logic:
- Bronze:
clean_trans()doesn't drop unknown columns — ifchannelis present in the CSV it flows through to Parquet automatically. - Silver:
build_transaction_fact()passes all columns through. - Gold:
build_account_monthly_profile()groups byaccount_id/year_month—channeldoesn't interfere.
- Bronze:
DISTRICT_COLUMN_MAPin config includesA17 → avg_salary_98. If A17 is absent the rename is a no-op. - Silver / Gold: left joins carry the column through if it exists.
Design principle: Column-presence guards (if col in df.columns) combined
with Parquet's schema-on-read make the pipeline backward-compatible — old files
produce the same output as before.
| Check | Where | Action |
|---|---|---|
| Required columns present | Stage 1, per table | ValueError — aborts pipeline |
| File exists | Stage 1, Stage 2 | FileNotFoundError — aborts pipeline |
| Null rate > 5% on any column | Stage 1 | WARNING log — continues |
| Transaction amount > 5,000,000 | Stage 1 | WARNING log — continues |
| Duplicate account_ids in entity | Stage 2 | AssertionError — aborts pipeline |
- Pandera / Great Expectations — Replace ad-hoc validation with a declarative schema contract that generates a data-quality report.
- Partitioned Parquet — Partition gold output by
year_monthorregionfor faster downstream queries. - Incremental processing — Currently full-refresh; could implement merge/upsert for the transaction fact table.
- Delta Lake — Already implemented: Delta tables for ACID transactions, time travel, and schema enforcement.
- Orchestration — Wrap stages in Dagster assets or Airflow tasks with dependency tracking and retry logic.
- CI/CD — GitHub Actions:
pytest --cov-fail-under=80+rufflint on every push. - Containerization — A
Dockerfilefor a fully reproducible environment.
testDataFactory/
├── pipeline/
│ ├── __init__.py
│ ├── config.py # All paths, constants, maps, thresholds
│ ├── stage1_bronze.py # Ingest & Clean
│ ├── stage2_silver.py # Enrich & Normalize
│ ├── stage3_gold.py # Aggregate
│ └── run.py # CLI entry point
├── tests/
│ ├── conftest.py
│ ├── test_stage1_bronze.py # Bronze unit tests
│ ├── test_stage2_silver.py # Silver unit tests
│ └── test_stage3_gold.py # Gold unit tests
├── data/ # Place CSV source files here (gitignored)
├── output/ # Pipeline output (gitignored)
│ ├── bronze/
│ ├── silver/
│ ├── gold/
│ └── samples/ # CSV samples — first 100 rows of each gold table
├── deJOBtests/ # Assignment specs
├── requirements.txt
├── .gitignore
└── README.md