This project demonstrate how to process data stored in a data lake fashion, transforming it into an OLAP optimized structure by using PySpark.
The PySpark Job runs on AWS EMR, and the Data Pipeline is orchestrated by Apache Airflow, including the whole infrastructure creation and the EMR cluster termination.
-
Tools and Technologies:
- Airflow: Data Pipeline organization and scheduling tool. Enables control and organization over script flows.
- PySpark: Data processing framework. Enables Distributed processing of datasets with a easy to use interface
- s3: Very reliable and scalable storage option.
-
Proposed Updates:
- For this kind of ETL Workload, either a daily or weekly update would make sense, based on the data structures of both data sources. With the same project structure it would be possible to have updates down to the minute window, as long as the data arrives on s3 in that pace. (This would require tweaks on the etl.py file.)
-
Possible Scenarios:
- Data Increased by 100x: The create_emr_cluster function can be updated to accommodate more robust EMR clusters, according to data processing speed needs.
- Dashboard updated daily, by 7am, for example: As mentioned in the proposed updates section, the airflow dag can be updated to run on a daily scheduling fashion and the emr clusters can be scaled up to match data processing speed needs.
- Database needs to be accessed by 100+ people: s3 data can be accessed, for example, by using Amazon Athena or Redshift Spectrum. Which both allow access control mechanisms. Many other tools allow s3 data fetching similarly.
-
Data Model - Star Schema:
- Dimension Tables:
- Table users: users in the app
- Columns: user_id, first_name, last_name, gender, level
- Table songs: songs in music database
- Columns: song_id, title, artist_id, year, duration
- Table artists: artists in music database
- Columns: artist_id, name, location, lattitude, longitude
- Table time: timestamps of records in songplays broken down into specific units
- Columns: start_time, hour, day, week, month, year, weekday
- Python 3.6 or greater.
- Clone this repository
- Edit the
dl.cfgfile with your AWS credentials and infrastructure configuration.config.shis also available as a helper script to build your dl.cfg file cdinto airflow-pyspark-emr
With Docker:
docker build -t airflow:1 .
docker run -i -p 8080:8080 -t airflow:1 /bin/bash
Without Docker:
-
Create and activate a virtualenv, docs: venv documentation , then:
-
(venv) $ pip install -r requirements.txt
-
Add AIRFLOW_HOME environment variable to ~/.bashrc, by adding the line:
export AIRFLOW_HOME=your_path_to/airflow-pyspark-emr/airflow_home to the end of the ~/.bashrc file
-
Run
airflow initdb -
In another terminal, run
airflow scheduler -
In another terminal, run
airflow webserver -
Access Airflow UI in localhost:8080
In the UI, two DAG's should be available:
- songs_etl
- terminate_emr_clusters
Turn them on and play with it.
-
airflow_home/plugins/spark_jobs - etl.py:
- This file has the PySpark logic and usage.
-
airflow_home/plugins/aws_utils - emr_deployment.py:
- This file has the logic to control the AWS infrastructure needed for the job to run.
-
Data Lake:
A data lake is a system or repository of data stored in its natural/raw format, usually object blobs or files.
-
Spark:
Apache Spark is a unified analytics engine for big data processing, with built-in modules for streaming, SQL, machine learning and graph processing.
A music streaming startup, Sparkify, has grown their user base and song database and want to move their processes and data onto the cloud. Their data resides in S3, in a directory of JSON logs on user activity on the app, as well as a directory with JSON metadata on the songs in their app.
-
Song Dataset :
The song data set is a subset of the Million Song Dataset.
Each item contains metadata about a song and it's artist.
Example format:
{ "num_songs": 1, "artist_longitude": null, "artist_latitute": null, "artist_location": "", "artist_name": "Line Renaud", "song_id": "SOUPIRU12A6D4FA1E1", "title": "Der Kleine Dompfaff", "duration": 152.92036, "year": 0 } -
Logs Dataset:
The logs dataset is a simulation of events generated by this Event Simulator.
They are stored in s3 with the following bucket organization structure:
log_data/2018/11/2018-11-12-events.json log_data/2018/11/2018-11-13-events.jsonThe json file format has the following structure:
{ "artist":null, "auth":"Logged In", "firstName":"Walter", "gender":"M", "itemInSession":0, "lastName":"Frye", "length":null, "level":"free", "location":"San Francisco-Oakland-Hayward, CA", "method":"GET", "page":"Home", "registration":1540919166796.0, "sessionId":38, "song":null, "status":200, "ts":1541105830796, "userAgent":"\"Mozilla\/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit\/537.36 (KHTML, like Gecko) Chrome\/36.0.1985.143 Safari\/537.36\"", "userId":"39" }
The target schema should look as follows:
-
Fact Table:
- Table songplays - records in event data associated with song plays i.e. records with page NextSong
- Columns: songplay_id, start_time, user_id, level, song_id, artist_id, session_id, location, user_agent
- Table songplays - records in event data associated with song plays i.e. records with page NextSong
-
Dimension Tables:
- Table users: users in the app
- Columns: user_id, first_name, last_name, gender, level
- Table songs: songs in music database
- Columns: song_id, title, artist_id, year, duration
- Table artists: artists in music database
- Columns: artist_id, name, location, lattitude, longitude
- Table time: timestamps of records in songplays broken down into specific units
- Columns: start_time, hour, day, week, month, year, weekday
- Table users: users in the app