This is a sample application to showcase streaming data analytics on the City Of Chicago Taxi Trips dataset using Spark Structured Streaming, Kafka and Spring Boot.
Below is the high level data flow architecture:
- The application exposes two REST endpoints (/tripstart & /tripend) to collect trip start and trip end events
- The events are pushed to Kafka topics
- The Spark Streaming pulls the events from Kafka topics and performs rolling aggregates as the events arrive
- There is a python helper script that creates mock events from a data file and submits them to the REST endpoints
https://data.cityofchicago.org/Transportation/Taxi-Trips-Dashboard/spcw-brbq https://data.cityofchicago.org/Transportation/Taxi-Trips/wrvz-psew
| Software | Version |
|---|---|
| Java | 1.8.0_131 |
| Kafka | 2.11-0.11.0.0 |
| ZooKeeper | 3.4.10 |
| Spark | 2.1.1, Using Scala version 2.11.8 |
| Python | 3.5.1 |
| pip | 9.0.1 |
Installations tips:
https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-14-04 https://gist.github.com/codspire/ee4a46ec054f962d9ef028b27fcb2635
$ git clone https://github.com/codspire/chicago-taxi-trips-streaming-analysis.gitUpdate the bootstrap property with your Kafka host:port in the below files:
- chicago-taxi-trips-streaming-analysis/taxitrips-kafka-sub/src/main/resources/application.yml
- chicago-taxi-trips-streaming-analysis/taxitrips-rest/src/main/resources/application.yml
$ cd chicago-taxi-trips-streaming-analysis
$ chmod +x mvnw
$ ./mvnw clean package- cd to the root folder (i.e.
chicago-taxi-trips-streaming-analysis) - Make sure Kafka and ZooKeeper are already running
java -jar taxitrips-rest/target/taxitrips-rest-0.0.1-SNAPSHOT.jarOptional step, this will print the Kafka messages to the console for debugging purpose
java -jar taxitrips-kafka-sub/target/taxitrips-kafka-sub-0.0.1-SNAPSHOT.jarcd to chicago-taxi-trips-streaming-analysis/taxitrips-events-generator/src/main/resources/python
Note:
- Replace
<hostname>placeholder with your hostname - Use
--datafileto provide the mock data file path (available undertaxitrips-events-generator/src/main/resourcesfolder, e.g. test-data-v2-aa.csv, test-data-v2-ab.csv...) - Use
--delayto specify the wait time in seconds between REST request submissions (e.g. 0.2 means 3 events per second)
$ python tripstart-events2.py \
--endpoint http://<hostname>:9090/tripstart \
--datafile ../test-data-v2-aa.csv \
--delay 0.2$ python tripend-events2.py \
--endpoint http://<hostname>:9090/tripend \
--datafile ../test-data-v2-aa.csv \
--delay 0.2cd to chicago-taxi-trips-streaming-analysis/taxitrips-events-generator/src/main/resources/python
$ spark-submit --packages org.apache.spark:spark-core_2.11:1.5.2,\
org.apache.spark:spark-streaming_2.11:1.5.2,\
org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0,\
org.apache.spark:spark-streaming-kafka_2.11:1.5.2,\
org.apache.kafka:kafka_2.11:0.11.0.0,\
org.apache.kafka:kafka-clients:0.11.0.0 \
./spark-streaming-analytics.py --kafka_servers localhost:9092Spark console should show the SQL results for events stream. The current query checks for count of trips started and ended every 30 secs grouped by Cab Company.
Sample Outputs:
+---------------------------------------------+---------------------------------+-----+----------------+
|window |companyName |count|eventType |
+---------------------------------------------+---------------------------------+-----+----------------+
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|Top Cab Affiliation |3 |tripstart events|
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|Northwest Management LLC |3 |tripstart events|
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|Dispatch Taxi Affiliation |25 |tripstart events|
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|KOAM Taxi Association |7 |tripstart events|
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|Taxi Affiliation Services |40 |tripstart events|
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|Choice Taxi Association |11 |tripstart events|
|[2017-07-15 20:24:00.0,2017-07-15 20:24:30.0]|Northwest Management LLC |12 |tripstart events|
|[2017-07-15 20:24:00.0,2017-07-15 20:24:30.0]|Choice Taxi Association |21 |tripstart events|
|[2017-07-15 20:24:00.0,2017-07-15 20:24:30.0]|KOAM Taxi Association |9 |tripstart events|
|[2017-07-15 20:24:00.0,2017-07-15 20:24:30.0]|Blue Ribbon Taxi Association Inc.|4 |tripstart events|
+---------------------------------------------+---------------------------------+-----+----------------+
only showing top 10 rows+---------------------------------------------+---------------------------------+-----+--------------+
|window |companyName |count|eventType |
+---------------------------------------------+---------------------------------+-----+--------------+
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|Top Cab Affiliation |2 |tripend events|
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|5129 - Mengisti Taxi |1 |tripend events|
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|Northwest Management LLC |3 |tripend events|
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|Dispatch Taxi Affiliation |16 |tripend events|
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|Blue Ribbon Taxi Association Inc.|2 |tripend events|
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|KOAM Taxi Association |2 |tripend events|
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|Taxi Affiliation Services |22 |tripend events|
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|3201 - C&D Cab Co Inc |1 |tripend events|
|[2017-07-15 20:24:30.0,2017-07-15 20:25:00.0]|Choice Taxi Association |10 |tripend events|
|[2017-07-15 20:24:00.0,2017-07-15 20:24:30.0]|Northwest Management LLC |4 |tripend events|
+---------------------------------------------+---------------------------------+-----+--------------+
only showing top 10 rows- The streaming results can be written to a persistent store (e.g. S3, HBase etc.) or ElasticSearch cluster which can be used to visualize the analytics in real-time.
- There are few limitations in Spark version 2.1.1 (read here). Another limitation is that there is no kafka format to write streaming datasets to Kafka (i.e. a Kafka sink) and it has to be handled using "foreach operator". This will probably be addressed in Spark 2.2 version (read here). It is in TODO to upgrade this app to Spark version 2.2 in near future.
