Skip to content

drdcs/StreamingServices

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

7 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Spark Streaming example

Use Case:

_Kafka producer -> reads the information from meetup group meetup.com
_ Spark Structured Streaming -> use spark to read the format as kafka.
_ Jdbc Load of events and Venue details -> Load a SQL server Table for specific information.

What you will learn ?

  • Design a kafka producer via pykafka.
  • How to design a case class for complex json.
  • A spark structure streaming read from kafka as a source.
  • How to do jdbc Load for writestream through foreach batch.

Tech Used: Kafka, Spark Programming Language: Scala and python.

The producer via pykafka (python) listens realtime data from a meetup group. The data in return is complex Json.

{
  "venue": {
    "venue_name": "16109 Nacogdoches Rd",
    "lon": -98.36081,
    "lat": 29.587322,
    "venue_id": 26466187
  },
  "visibility": "public",
  "response": "yes",
  "guests": 0,
  "member": {
    "member_id": 200069824,
    "photo": "https://secure.meetupstatic.com/photos/member/e/9/e/c/thumb_254039884.jpeg",
    "member_name": "Debbie LaBouff Burnam"
  },
  "rsvp_id": 1840421555,
  "mtime": 1591671730500,
  "event": {
    "event_name": "SISTA's @ Pompeii ",
    "event_id": "tbhmgrybcjbnb",
    "time": 1591806600000,
    "event_url": "https://www.meetup.com/meetup-group-VVwOORPh/events/271065743/"
  },
  "group": {
    "group_topics": [
      {
        "urlkey": "smallbiz",
        "topic_name": "Small Business"
      },
      {
        "urlkey": "business-referral-networking",
        "topic_name": "Business Referral Networking"
      },
      {
        "urlkey": "women-entrepreneurs",
        "topic_name": "Women Entrepreneurs"
      },
      {
        "urlkey": "business-strategy",
        "topic_name": "Business Strategy"
      },
      {
        "urlkey": "professional-women",
        "topic_name": "Professional Women"
      },
      {
        "urlkey": "professional-networking",
        "topic_name": "Professional Networking"
      },
      {
        "urlkey": "womens-business-networking",
        "topic_name": "Women's Business Networking"
      },
      {
        "urlkey": "womens-networking",
        "topic_name": "Women's Networking"
      },
      {
        "urlkey": "networking-your-network-marketing-business",
        "topic_name": "Networking Your Network Marketing Business"
      }
    ],
    "group_city": "San Antonio",
    "group_country": "us",
    "group_id": 32201506,
    "group_name": "Sista's",
    "group_lon": -98.47,
    "group_urlname": "meetup-group-VVwOORPh",
    "group_state": "TX",
    "group_lat": 29.48
  }
}

As you can see it's a nested json and we can use a Case class to read with format tie to the json.

schema below:

case class MeetUpCaseClass(
                   venue: VenueDetails,
                   visibility: Option[String],
                   response: Option[String],
                   guests: Option[String],
                   member: MemberDetails,
                   rsvp_id: Option[Long],
                   mtime: Option[Long],
                   event: EventDetails,
                   group: GroupDetails
                 )

case class GroupDetails (group_topics: Array[GroupTopics],
                         group_city: Option[String],
                         group_country: Option[String],
                         group_id: Long,
                         group_name: Option[String],
                         group_lon: Option[Float],
                         group_urlname: Option[String],
                         group_state: Option[String],
                         group_lat: Option[String])

case class VenueDetails (venue_name: Option[String], lon: Option[Float], lat: Option[Float], venue_id:Option[Long])
case class MemberDetails (member_id: Long, photo: Option[String], member_name: Option[String] )
case class  EventDetails (event_name: Option[String], event_id: Option[String], time: Long, event_url:String)
case class GroupTopics(urlkey: Option[String], topic_name: Option[String])

refer the page: src/main/scala/com/demo/schema

The data then finally loaded into the SQL Server through structured Streaming for each batch.

The data write process is a micro batch process which load the table for eachbatch and the process can be triggered.

streamDF.writeStream.trigger(Trigger.ProcessingTime("2 seconds")).outputMode("update").foreachBatch{
      (batchDF: DataFrame, batchId: Long) =>
        batchDF.coalesce(2).select("venue.*").show()
         val venue = batchDF.select($"venue.venue_name", $"venue.lon", $"venue.lat",$"venue.venue_id")
         val event = batchDF.select($"event.event_name", $"event.event_id", $"event.time", $"event.event_url")
        event.write.mode(SaveMode.Append)
             .jdbc(jdbcUrl, jdbcTable, properties)
        venue.write.mode(SaveMode.Append)
          .jdbc(jdbcUrl, jdbcVenueTable, properties)
    }.start()

About

The repo talks about Kafka as a Producer, Spark as a Listener and SQL server as a Receiver

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors