Skip to content

60East/amps-integration-apache-flink

Repository files navigation

Apache Flink AMPS Connector

The Apache Flink AMPS connectors utilize AMPS Java clients to allow applications in Apache Flink to subscribe and publish to AMPS.

Introduction

The connectors allow developers to focus on business logic by reducing the cognitive and operational overhead of managing HA AMPS clients. A FLIP-27 source and a SinkV2 sink are provided for communication between Apache Flink and AMPS. The connectors are located in the flink-connector-amps/ directory. Examples using the connectors are located in the flink-connector-amps-examples/ directory.

Prerequisites

Quickstart

The amps-configs/ directory includes several AMPS configuration files that are used in the examples, but only amps-configs/config.xml is necessary here.

The example used here involves a job where a client publishes to AMPS, the source reads from AMPS into Flink, Flink modifies the message, the sink publishes back to AMPS, and a client subscribed to AMPS prints the messages to the console.

  1. Ensure Maven and Java are installed

    java -version
    mvn -v
    
  2. Navigate to the parent directory

    cd path/to/amps-integration-apache-flink
    
  3. Start AMPS instance if not already started

    ~/AMPS-{amps_version}-Release-Linux/bin/ampServer amps-configs/config.xml
    
  4. Start Flink cluster if not already started

    • Open a new terminal. This will be used to start the cluster and submit to Flink

    • Navigate to your flink-{flink_version} directory

    • Start cluster

      bin/start-cluster.sh
      
  5. Build the Flink job

    • Open a new terminal. This will be used to build the job

    • Navigate to the parent directory

    • Build the job

      mvn clean package
      
    • The JAR should be flink-connector-amps-examples/target/flink-connector-amps-examples-{connector_version}.jar

  6. Submit the job to Flink

    • Select the terminal that is in the flink-{flink_version} directory

    • Submit the job

      bin/flink run path/to/flink-connector-amps-examples/target/flink-connector-amps-examples-{connector_version}.jar
      
  7. Monitor the job

    • The original and modified messages will be printed to the console
    • The Galvanometer (localhost:8085 by default) to monitor AMPS
    • The Flink web UI (localhost:8081 by default) to monitor Flink
  8. Cancel the job

    • In the Flink web UI, go to the "Overview" tab, click on the job in the "Running Job List", and cancel the job

Connector Logs

In order to enable a specific logging level for the AMPS Flink Connectors such as debug, modify the logging configuration file flink-{flink_version}/conf/log4j.properties by adding the following:

logger.amps.name = com.crankuptheamps.flink
logger.amps.level = DEBUG

Javadoc Generation

Javadocs can be generated by using Maven.

  1. Navigate to the parent directory

    cd path/to/amps-integration-apache-flink
    
  2. Build the javadocs with Maven

    • Use the -DskipJavadocs argument to build the javadocs

    • The javadocs will be located in the flink-connector-amps/target/apidocs/ directory

      mvn clean package -DskipJavadocs=false
      
  3. Use the javadocs

    • Navigate to the flink-connector-amps/target/apidocs/ directory
    • Open index.html to view the javadocs

Configuring the Connectors

The Configuration documentation provides a detailed explanation of how to configure the AMPS Source and AMPS Sink. Both connectors use the builder pattern to construct instances. Required and optional builder methods are detailed in the documentation, and several examples demonstrate how the builder methods can be chained.

Other Flink Job Examples

The Examples documentation provides the steps necessary to run any example as well as how to provide optional arguments.

The examples are intended to run using the AMPS configuration files located in the amps-configs/ directory. It may be necessary to change where the transaction logs are stored. This can be done by modifying the JournalDirectory. More detailed instructions are located it the examples documentation.

Several examples showcasing the functionality of the connectors are located in the flink-connector-amps-examples/src/main/java/com/crankuptheamps/flink/example/ directory. The following list summarizes the jobs:

  • Basic AMPSSource usage
  • Basic AMPSSink usage
  • Batched bookmark subscription using topN
  • Checkpointing
  • Live subscriptions
  • Message queue subscriptions
  • Parallel AMPSSource usage
  • Replication
  • Queries (sow and sow_and_subscribe)
  • Converting between DataStream and Table API with the AMPS connectors
  • Volume subscription/publishing
  • AMPSSink with a publish store
  • Flink aggregation with the AMPS connectors
  • Deserialization and header key usage with an AMPSDeserializationSchema
  • Serialization and correlation ID usage with an AMPSSerializationSchema
  • Using SSL with the AMPS connectors

There are also some examples that have a specific use case:

  • Job detailed in a blog (BlogExample)
  • Jobs intended to run for long periods of time (SoakTest)

Tests

The Maven Surefire Plugin is used to run all the tests. Instructions for running the tests for the AMPS Flink connectors are located in the Tests documentation.

About

AMPS Apache Flink Connector

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages