A production-grade data engineering project implementing a medallion architecture to process real-time flight data from the Israeli Airport Authority API using Apache Airflow, Databricks, and dbt.
This pipeline ingests, transforms, and models approximately 3000 daily flights through a three-tier medallion architecture (Bronze → Silver → Gold), providing clean, analytical-ready data for flight delay analysis and operational insights.
- Real-time API Integration: Automated daily ingestion from Israeli Airport Authority public API
- Medallion Architecture: Industry-standard Bronze/Silver/Gold data layering
- Incremental Processing: Smart merge strategies to handle updates and late-arriving data
- Data Quality: Built-in validation, deduplication, and dbt testing
- Cloud-Native: Runs on Databricks (AWS EC2/S3) with Delta Lake storage format
- Containerized Orchestration: Apache Airflow running in Docker Compose
The DAG orchestrates three main tasks: Bronze ingestion → Silver transformation → Gold modeling
Israeli Airport Authority API (JSON)
↓
Bronze Layer (Raw Data)
├─ flights_bronze.raw_flights
├─ Schema: 20+ raw API fields
└─ Strategy: Append-only with schema evolution
↓
Silver Layer (Cleaned & Normalized)
├─ flights_silver.flights_clean
├─ Transformations: Standardization, type conversion, deduplication
└─ Strategy: MERGE on composite key (flight_number + airline_code + current_date + arrival_departure)
↓
Gold Layer (Analytics-Ready)
├─ flights_gold.fact_delayed_flights
├─ Models: Delay calculations, fact tables
└─ Tool: dbt-databricks
| Layer | Technology | Purpose |
|---|---|---|
| Orchestration | Apache Airflow 3.1.6 | Workflow scheduling and monitoring |
| Compute | Databricks on AWS EC2 | Distributed Spark processing |
| Storage | Delta Lake on S3 | ACID transactions, time travel, versioning |
| Transformation | PySpark + dbt-databricks | Data cleaning and modeling |
| Containerization | Docker Compose | Local development and deployment |
airport-data-pipeline/
├── dags/
│ └── airport_pipeline_dag.py # Airflow DAG orchestrating all tasks
├── bronze/
│ └── bronze_flights_api.ipynb # API ingestion to Bronze layer
├── silver/
│ └── silver_flights_notebook.ipynb # Cleaning & normalization to Silver
├── airport_dbt/
│ ├── models/
│ │ ├── fact_delayed_flights.sql # Gold layer dbt model
│ │ └── sources.yml # Source definitions
│ ├── profiles.yml # Databricks connection config
│ └── dbt_project.yml # dbt project settings
├── config/
│ └── airflow.cfg # Airflow configuration
├── docker-compose.yaml # Multi-container Airflow setup
├── dockerfile # Custom Airflow image with Spark/dbt
├── requirements.txt # Python dependencies
└── README.md
- Docker Desktop (with Docker Compose)
- Databricks workspace on AWS
- Git
- 4GB+ RAM for Airflow services
-
Clone the repository
git clone https://github.com/yourusername/airport-data-pipeline.git cd airport-data-pipeline -
Create
.envfile in project root:# Airflow AIRFLOW_UID=50000 _AIRFLOW_WWW_USER_USERNAME=airflow _AIRFLOW_WWW_USER_PASSWORD=airflow # Databricks DATABRICKS_HOST=your-workspace.cloud.databricks.com DATABRICKS_HTTP_PATH=/sql/1.0/warehouses/your-warehouse-id DATABRICKS_CLUSTER_ID=your-cluster-id DATABRICKS_DBT_ACCESS_KEY=your-databricks-token # Notebook Paths BRONZE_PATH=/Workspace/Users/your-email/bronze_flights_api SILVER_PATH=/Workspace/Users/your-email/silver_flights_notebook
-
Upload Notebooks to Databricks
- Upload
bronze/bronze_flights_api.ipynbto your Databricks workspace - Upload
silver/silver_flights_notebook.ipynbto your Databricks workspace - Update paths in
.envto match your workspace structure
- Upload
-
Initialize Airflow
docker-compose up airflow-init
-
Start Airflow services
docker-compose up -d
-
Access Airflow UI
- URL: http://localhost:8080
- Username:
airflow(or from.env) - Password:
airflow(or from.env)
-
Set up Databricks Connection in Airflow
- Navigate to Admin → Connections
- Add connection ID:
databricks_default - Connection Type:
Databricks - Host: Your Databricks workspace URL
- Token: Your personal access token
-
Add Airflow Variable
- Navigate to Admin → Variables
- Add key:
DATABRICKS_DBT_ACCESS_KEY - Value: Your Databricks token
-
Trigger the DAG
- Go to DAGs page
- Find
airport_pipeline - Click the play button to trigger manually
- Monitor progress in the Graph view
Notebook: bronze_flights_api.ipynb
- Fetches flight data from Israeli Airport Authority API
- Adds metadata columns:
ingestion_timestamp,source_file - Writes to Delta table with schema merge enabled (handles API changes)
- Strategy: Append-only to maintain full history
df_bronze.write \
.mode("append") \
.format("delta") \
.option("mergeSchema", "true") \
.saveAsTable("flights_bronze.raw_flights")Notebook: silver_flights_notebook.ipynb
Transformations Applied:
- Rename airport code column names (CHAORD → arrival_departure, CHFLTN → flight_number)
- Standardize values (A/D → arrival/departure)
- Trim whitespace from all string columns
- Uppercase codes for consistency (airline_code, airport_code, status)
- Convert timestamp strings to proper timestamp types
- Add derived columns (flight_date for partitioning)
- Deduplicate based on composite key
Merge Strategy:
MERGE INTO flights_silver.flights_clean
USING incoming_flights
ON target.flight_number = source.flight_number
AND target.airline_code = source.airline_code
AND target.arrival_departure = source.arrival_departure
AND target.flight_date = source.flight_date
WHEN MATCHED THEN UPDATE
WHEN NOT MATCHED THEN INSERTComposite Key Rationale:
- Same flight operates daily (need date)
- Same flight number can be both arrival AND departure (need direction)
- Ensures uniqueness while allowing updates for status changes
dbt Model: fact_delayed_flights.sql
- Calculates delay in minutes for landed flights
- Incremental materialization for efficiency
- Only processes new/updated records
- Creates fact table optimized for delay analysis
Sample Query Output:
SELECT
airline_name,
AVG(delay_minutes) as avg_delay,
COUNT(*) as total_delayed_flights
FROM flights_gold.fact_delayed_flights
WHERE delay_minutes > 0
GROUP BY airline_name
ORDER BY avg_delay DESC;Bronze Layer:
- Schema evolution handling (mergeSchema)
- API response validation
Silver Layer:
- Null checks on merge key fields
- Deduplication logic (keep most recent record)
- Assertions for data integrity
Gold Layer:
- dbt tests (via
dbt test) - Data freshness checks
- Referential integrity
# SSH into Airflow worker
docker exec -it <container-id> bash
# Navigate to dbt project
cd /opt/airflow/airport_dbt
# Run dbt tests
dbt test --profiles-dir /opt/airflow/config- DAG run history and status
- Task logs and error messages
- Execution duration tracking
- Retry behavior monitoring
- Notebook execution logs
- Spark job metrics
- Delta Lake version history
- Query performance insights
- Bronze → Silver: Modify
silver_flights_notebook.ipynb - Silver → Gold: Add new dbt models in
airport_dbt/models/ - Test locally with dbt:
dbt run --select your_model
Add new data sources:
- Create new Bronze ingestion notebook
- Add Databricks task to DAG
- Create corresponding Silver cleaning logic
- Build dbt models for analytics
Modify schedule:
# In airport_pipeline_dag.py
schedule=None, # Manual trigger only
# OR
schedule='0 2 * * *', # Daily at 2 AMIssue: Airflow can't connect to Databricks
- Solution: Verify
databricks_defaultconnection settings - Check token hasn't expired
- Ensure cluster is running
Issue: dbt models fail
- Solution: Check Databricks token in Airflow variables
- Verify
profiles.ymlconnection settings - Ensure Silver tables exist before running Gold models
Issue: Schema merge errors
- Solution: Check for incompatible data type changes
- Review Delta Lake transaction logs
- Use
DESCRIBE HISTORYto debug
# Airflow task logs
docker-compose logs airflow-worker
# Databricks notebook logs
# Available in Databricks workspace UI under notebook runs- Incremental dbt models: Only process new/updated records
- Delta Lake MERGE: Efficient upserts vs full rewrites
- Partitioning: By
flight_datefor better query performance - Deduplication: Window functions before merge reduce data volume
Raw API fields with minimal transformation.
| Column | Type | Description |
|---|---|---|
| arrival_departure | STRING | Flight direction: 'arrival' or 'departure' |
| flight_number | STRING | Flight number (e.g., '235') |
| airline_code | STRING | IATA airline code (e.g., 'LY', 'AC') |
| flight_date | DATE | Date of scheduled landing |
| origin_airport_code | STRING | IATA airport code (arrivals only) |
| origin_city_en | STRING | Origin city in English |
| origin_country_he | STRING | Origin country in Hebrew |
| airline_name | STRING | Full airline name |
| actual_landing_time | TIMESTAMP | Actual landing timestamp |
| scheduled_landing_time | TIMESTAMP | Scheduled landing timestamp |
| status_en | STRING | Flight status (LANDED, DELAYED, etc.) |
| terminal | BIGINT | Terminal number |
| updated_at | TIMESTAMP | Last update timestamp |
| Column | Type | Description |
|---|---|---|
| flight_id | STRING | Composite key (airline+flight+direction+date) |
| flight_number | STRING | Flight number |
| airline_code | STRING | IATA airline code |
| airline_name | STRING | Full airline name |
| flight_date | DATE | Date of flight |
| scheduled_landing_time | TIMESTAMP | Scheduled landing |
| actual_landing_time | TIMESTAMP | Actual landing |
| delay_minutes | INTEGER | Delay in minutes (null if on-time) |
This is a portfolio project, but suggestions and improvements are welcome!
- Fork the repository
- Create a feature branch (
git checkout -b feature/improvement) - Commit changes (
git commit -am 'Add new feature') - Push to branch (
git push origin feature/improvement) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.

