Skip to content

gpandolfo/testDataFactory

Repository files navigation

Assignment Compliance: Option B — Financial Services Pipeline

This project is designed to fully comply with the requirements in the Option B assignment. Below is a mapping of assignment requirements to implemented features:

Stage 1: Ingest & Clean (Raw → Bronze)

  • Loads all 8 source files from the Czech Financial Dataset
  • Decodes birth_number into birth_date and gender (see stage1_bronze.py)
  • Renames district columns from A1-A16 to descriptive English names (see config.py)
  • Translates Czech-language codes in trans to English equivalents
  • Parses all date fields into proper date types
  • Handles NULL values explicitly (see Null Strategy section)
  • Outputs clean, typed datasets as Delta tables, Parquet, and CSV

Stage 2: Enrich & Normalize (Bronze → Silver)

  • Builds an account-centric entity model by joining account, owner client, district, loan, and card
  • Derives fields: account_age_months, owner_age_at_account_creation, has_loan, has_card, loan_status_category
  • Normalizes column names to snake_case
  • Outputs unified silver-layer entity table and cleaned transaction fact table as Delta, Parquet, and CSV

Stage 3: Aggregate (Silver → Gold)

  • Produces two output tables:
    • Account Monthly Profile: total credits, debits, net cash flow, transaction count, EOM balance, card withdrawals, loan payment flag, gender, age bucket, region
    • Regional Segment Summary: median net cash flow, avg transaction count, active accounts, avg salary, loan default rate, gold card holders
  • Outputs both tables as Delta, Parquet, and CSV

Stage 4: Schema Evolution (Simulated)

  • Handles optional columns (trans.channel, district.A17) gracefully; backward compatible
  • If present, columns flow through to silver and gold layers
  • Design is documented in code and README

Constraints

  • Python 3.10+
  • PySpark + Delta Lake (processing)
  • Output format: Parquet, CSV, Delta tables
  • Runs locally, no cloud services required
  • No notebooks; pipeline is fully script-based

Deliverables

  • Pipeline code organized by stage
  • README.md with setup, run instructions, design decisions, schema evolution, improvement ideas
  • Sample output: CSV and Parquet for each gold-layer table (first 100 rows in /output/samples)
  • Unit tests for each stage

Evaluation Criteria

  • Pipeline architecture: clear stage separation, logical flow
  • Data quality handling: nulls, type casting, code translation, encoded field decoding
  • Code organization: modular, readable, clear responsibilities
  • Schema evolution: backward compatible, handles new/missing columns
  • Aggregation correctness: accurate rollups, edge cases handled
  • Documentation: clear README, code comments
  • Testing: unit tests included
  • Performance: Spark/Delta Lake for scalable processing

Czech Financial Data Pipeline

This is a PySpark + Delta Lake Medallion-architecture pipeline that ingests the Czech Financial (Berka) dataset, cleans and normalizes it through Bronze → Silver → Gold layers, and produces aggregated customer-level feature tables ready for analytics.

Quick Start

Prerequisites

  • Python 3.10+

Setup

python -m venv .venv

# Windows
.venv\Scripts\activate
# macOS / Linux
source .venv/bin/activate

pip install -r requirements.txt

Data

  1. Download the Czech Financial Dataset from Kaggle.
  2. Place the 8 CSV files into the data/ directory:
data/
├── account.csv
├── client.csv
├── disp.csv
├── trans.csv
├── order.csv
├── loan.csv
├── card.csv
└── district.csv

Run the Pipeline

python -m pipeline.run

Optional flags:

python -m pipeline.run --data-dir path/to/csv/files --log-level DEBUG

Run Tests

pytest tests/ -v --tb=short

With coverage:

pytest tests/ -v --cov=pipeline --cov-report=term-missing

Architecture

data/*.csv          (Raw source files)
    │
    ▼
┌──────────────────────────────────────────────────┐
│  Stage 1 — Bronze  (Ingest & Clean)              │
│  • Load 8 semicolon-delimited CSVs               │
│  • Validate required columns per table            │
│  • Decode birth_number → birth_date + gender      │
│  • Rename district A1–A16 columns                 │
│  • Translate Czech codes → English                │
│  • Parse YYMMDD dates (century pivot at 50)       │
│  • Warn on high null rates and anomalous amounts  │
└──────────────────────────────────────────────────┘
    │  output/bronze/*.parquet
    ▼
┌──────────────────────────────────────────────────┐
│  Stage 2 — Silver  (Enrich & Normalize)          │
│  • Account-centric entity model                  │
│  • Join: account → owner → district              │
│          → loan (optional, deduped)              │
│          → card (optional)                       │
│  • Derive: account_age_months, owner_age,        │
│            has_loan, has_card,                   │
│            loan_status_category (good/bad/none)  │
│  • Assert entity uniqueness (one row/account)    │
└──────────────────────────────────────────────────┘
    │  output/silver/*.parquet
    ▼
┌──────────────────────────────────────────────────┐
│  Stage 3 — Gold  (Aggregate)                     │
│  • Table A: Account Monthly Profile              │
│  • Table B: Regional Segment Summary             │
└──────────────────────────────────────────────────┘
    │  output/gold/*.parquet
    │  output/samples/*.csv  (first 100 rows each)

Processing Choice: PySpark + Delta Lake

This pipeline uses PySpark and Delta Lake for scalable, distributed processing and ACID-compliant storage.

Rationale:

  1. Scalability — PySpark handles large datasets efficiently, and Delta Lake provides robust storage with schema enforcement and time travel.
  2. Compatibility — The pipeline is ready for big data workloads and can be run locally or in a cluster.
  3. Data Lakehouse — Delta Lake enables transactional updates and schema evolution, supporting modern analytics workflows.

Trade-off: For small datasets, Pandas would be simpler, but this pipeline is designed for production-scale and future extensibility.


Design Decisions

Centralized Configuration

All magic values — paths, translation maps, required-column contracts, data-quality thresholds, age-bucket parameters — live in pipeline/config.py. No literals are scattered across stage files.

Null Strategy

  • Preserve nulls through Bronze/Silver rather than filling with sentinel values. This keeps the data honest for downstream analytics.
  • Empty strings in trans columns (operation, k_symbol, bank, account) are explicitly converted to NaN in Bronze so there is a single null representation throughout.
  • District columns with known missing values (district 69, Jesenik: unemployment_rate_95 and crimes_95) are left as NaN.

Entity Model

The Silver entity table is account-centric (one row per account) with the owner's demographics, district attributes, and optional loan/card info joined in. A post-build assertion enforces uniqueness. This makes Gold-layer aggregation straightforward — join the entity once and group.

Loan Deduplication

The Berka dataset has one loan per account, but the pipeline defensively deduplicates: if multiple loans exist, only the most-recent (by loan_date) is kept, preventing fan-out on the entity join.

Aggregation Edge Cases

  • Accounts with no transactions in a month simply do not appear in the monthly profile (sparse representation).
  • eom_balance is the balance from the last transaction of the month, ordered by (trans_date, trans_id) to break ties deterministically.
  • Loan default rate is NaN when a region has no accounts with loans, distinguishing "no loan data" from a genuine 0% default rate.

Schema Evolution (Stage 4)

Two simulated V2 changes are handled without modifying core logic:

1. trans.channel (new nullable varchar)

  • Bronze: clean_trans() doesn't drop unknown columns — if channel is present in the CSV it flows through to Parquet automatically.
  • Silver: build_transaction_fact() passes all columns through.
  • Gold: build_account_monthly_profile() groups by account_id/year_monthchannel doesn't interfere.

2. district.A17avg_salary_98 (new nullable integer)

  • Bronze: DISTRICT_COLUMN_MAP in config includes A17 → avg_salary_98. If A17 is absent the rename is a no-op.
  • Silver / Gold: left joins carry the column through if it exists.

Design principle: Column-presence guards (if col in df.columns) combined with Parquet's schema-on-read make the pipeline backward-compatible — old files produce the same output as before.


Data Quality Checks

Check Where Action
Required columns present Stage 1, per table ValueError — aborts pipeline
File exists Stage 1, Stage 2 FileNotFoundError — aborts pipeline
Null rate > 5% on any column Stage 1 WARNING log — continues
Transaction amount > 5,000,000 Stage 1 WARNING log — continues
Duplicate account_ids in entity Stage 2 AssertionError — aborts pipeline

What I Would Improve With More Time

  1. Pandera / Great Expectations — Replace ad-hoc validation with a declarative schema contract that generates a data-quality report.
  2. Partitioned Parquet — Partition gold output by year_month or region for faster downstream queries.
  3. Incremental processing — Currently full-refresh; could implement merge/upsert for the transaction fact table.
  4. Delta Lake — Already implemented: Delta tables for ACID transactions, time travel, and schema enforcement.
  5. Orchestration — Wrap stages in Dagster assets or Airflow tasks with dependency tracking and retry logic.
  6. CI/CD — GitHub Actions: pytest --cov-fail-under=80 + ruff lint on every push.
  7. Containerization — A Dockerfile for a fully reproducible environment.

Project Structure

testDataFactory/
├── pipeline/
│   ├── __init__.py
│   ├── config.py           # All paths, constants, maps, thresholds
│   ├── stage1_bronze.py    # Ingest & Clean
│   ├── stage2_silver.py    # Enrich & Normalize
│   ├── stage3_gold.py      # Aggregate
│   └── run.py              # CLI entry point
├── tests/
│   ├── conftest.py
│   ├── test_stage1_bronze.py      # Bronze unit tests
│   ├── test_stage2_silver.py      # Silver unit tests
│   └── test_stage3_gold.py        # Gold unit tests
├── data/                   # Place CSV source files here (gitignored)
├── output/                 # Pipeline output (gitignored)
│   ├── bronze/
│   ├── silver/
│   ├── gold/
│   └── samples/            # CSV samples — first 100 rows of each gold table
├── deJOBtests/             # Assignment specs
├── requirements.txt
├── .gitignore
└── README.md

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Packages

 
 
 

Contributors