Skip to content

Commit cf2e02d

Browse files
committed
JAVA-6390: Move kafka articles from libraries-data-3 to new module
apache-kafka
1 parent 6bf1b58 commit cf2e02d

20 files changed

Lines changed: 128 additions & 148 deletions
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.baeldung.flink;
2+
3+
import com.baeldung.flink.model.Backup;
4+
import com.baeldung.flink.model.InputMessage;
5+
import com.baeldung.flink.operator.BackupAggregator;
6+
import com.baeldung.flink.operator.InputMessageTimestampAssigner;
7+
import com.baeldung.flink.operator.WordsCapitalizer;
8+
import org.apache.flink.streaming.api.TimeCharacteristic;
9+
import org.apache.flink.streaming.api.datastream.DataStream;
10+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
11+
import org.apache.flink.streaming.api.windowing.time.Time;
12+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
13+
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
14+
15+
import static com.baeldung.flink.connector.Consumers.*;
16+
import static com.baeldung.flink.connector.Producers.*;
17+
18+
public class FlinkDataPipeline {
19+
20+
public static void capitalize() throws Exception {
21+
String inputTopic = "flink_input";
22+
String outputTopic = "flink_output";
23+
String consumerGroup = "baeldung";
24+
String address = "localhost:9092";
25+
26+
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
27+
28+
FlinkKafkaConsumer011<String> flinkKafkaConsumer = createStringConsumerForTopic(inputTopic, address, consumerGroup);
29+
flinkKafkaConsumer.setStartFromEarliest();
30+
31+
DataStream<String> stringInputStream = environment.addSource(flinkKafkaConsumer);
32+
33+
FlinkKafkaProducer011<String> flinkKafkaProducer = createStringProducer(outputTopic, address);
34+
35+
stringInputStream.map(new WordsCapitalizer())
36+
.addSink(flinkKafkaProducer);
37+
38+
environment.execute();
39+
}
40+
41+
public static void createBackup() throws Exception {
42+
String inputTopic = "flink_input";
43+
String outputTopic = "flink_output";
44+
String consumerGroup = "baeldung";
45+
String kafkaAddress = "localhost:9092";
46+
47+
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
48+
49+
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
50+
51+
FlinkKafkaConsumer011<InputMessage> flinkKafkaConsumer = createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
52+
flinkKafkaConsumer.setStartFromEarliest();
53+
54+
flinkKafkaConsumer.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
55+
FlinkKafkaProducer011<Backup> flinkKafkaProducer = createBackupProducer(outputTopic, kafkaAddress);
56+
57+
DataStream<InputMessage> inputMessagesStream = environment.addSource(flinkKafkaConsumer);
58+
59+
inputMessagesStream.timeWindowAll(Time.hours(24))
60+
.aggregate(new BackupAggregator())
61+
.addSink(flinkKafkaProducer);
62+
63+
environment.execute();
64+
}
65+
66+
public static void main(String[] args) throws Exception {
67+
createBackup();
68+
}
69+
70+
}

libraries-data-2/src/main/java/com/baeldung/flink/connector/Consumers.java renamed to apache-kafka/src/main/java/com/baeldung/flink/connector/Consumers.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,20 @@
99

1010
public class Consumers {
1111

12-
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
13-
String topic, String kafkaAddress, String kafkaGroup ) {
14-
Properties props = new Properties();
15-
props.setProperty("bootstrap.servers", kafkaAddress);
16-
props.setProperty("group.id",kafkaGroup);
17-
FlinkKafkaConsumer011<String> consumer =
18-
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
12+
public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(String topic, String kafkaAddress, String kafkaGroup) {
13+
Properties props = new Properties();
14+
props.setProperty("bootstrap.servers", kafkaAddress);
15+
props.setProperty("group.id", kafkaGroup);
16+
FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props);
1917

20-
return consumer;
21-
}
18+
return consumer;
19+
}
2220

23-
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) {
21+
public static FlinkKafkaConsumer011<InputMessage> createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup) {
2422
Properties properties = new Properties();
2523
properties.setProperty("bootstrap.servers", kafkaAddress);
26-
properties.setProperty("group.id",kafkaGroup);
27-
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(
28-
topic, new InputMessageDeserializationSchema(),properties);
24+
properties.setProperty("group.id", kafkaGroup);
25+
FlinkKafkaConsumer011<InputMessage> consumer = new FlinkKafkaConsumer011<InputMessage>(topic, new InputMessageDeserializationSchema(), properties);
2926

3027
return consumer;
3128
}

libraries-data-2/src/main/java/com/baeldung/flink/connector/Producers.java renamed to apache-kafka/src/main/java/com/baeldung/flink/connector/Producers.java

File renamed without changes.

libraries-data-2/src/main/java/com/baeldung/flink/model/Backup.java renamed to apache-kafka/src/main/java/com/baeldung/flink/model/Backup.java

File renamed without changes.

libraries-data-2/src/main/java/com/baeldung/flink/model/InputMessage.java renamed to apache-kafka/src/main/java/com/baeldung/flink/model/InputMessage.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ public InputMessage() {
1818
public String getSender() {
1919
return sender;
2020
}
21+
2122
public void setSender(String sender) {
2223
this.sender = sender;
2324
}
@@ -55,13 +56,12 @@ public InputMessage(String sender, String recipient, LocalDateTime sentAt, Strin
5556

5657
@Override
5758
public boolean equals(Object o) {
58-
if (this == o) return true;
59-
if (o == null || getClass() != o.getClass()) return false;
59+
if (this == o)
60+
return true;
61+
if (o == null || getClass() != o.getClass())
62+
return false;
6063
InputMessage message1 = (InputMessage) o;
61-
return Objects.equal(sender, message1.sender) &&
62-
Objects.equal(recipient, message1.recipient) &&
63-
Objects.equal(sentAt, message1.sentAt) &&
64-
Objects.equal(message, message1.message);
64+
return Objects.equal(sender, message1.sender) && Objects.equal(recipient, message1.recipient) && Objects.equal(sentAt, message1.sentAt) && Objects.equal(message, message1.message);
6565
}
6666

6767
@Override
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.baeldung.flink.operator;
2+
3+
import com.baeldung.flink.model.Backup;
4+
import com.baeldung.flink.model.InputMessage;
5+
import org.apache.flink.api.common.functions.AggregateFunction;
6+
7+
import java.time.LocalDateTime;
8+
import java.util.ArrayList;
9+
import java.util.List;
10+
11+
public class BackupAggregator implements AggregateFunction<InputMessage, List<InputMessage>, Backup> {
12+
@Override
13+
public List<InputMessage> createAccumulator() {
14+
return new ArrayList<>();
15+
}
16+
17+
@Override
18+
public List<InputMessage> add(InputMessage inputMessage, List<InputMessage> inputMessages) {
19+
inputMessages.add(inputMessage);
20+
return inputMessages;
21+
}
22+
23+
@Override
24+
public Backup getResult(List<InputMessage> inputMessages) {
25+
Backup backup = new Backup(inputMessages, LocalDateTime.now());
26+
return backup;
27+
}
28+
29+
@Override
30+
public List<InputMessage> merge(List<InputMessage> inputMessages, List<InputMessage> acc1) {
31+
inputMessages.addAll(acc1);
32+
return inputMessages;
33+
}
34+
}

libraries-data-2/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java renamed to apache-kafka/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWate
1212
@Override
1313
public long extractTimestamp(InputMessage element, long previousElementTimestamp) {
1414
ZoneId zoneId = ZoneId.systemDefault();
15-
return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
15+
return element.getSentAt()
16+
.atZone(zoneId)
17+
.toEpochSecond() * 1000;
1618
}
1719

1820
@Nullable

libraries-data-2/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java renamed to apache-kafka/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java

File renamed without changes.

libraries-data-2/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java renamed to apache-kafka/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,15 @@
99
import org.slf4j.Logger;
1010
import org.slf4j.LoggerFactory;
1111

12-
public class BackupSerializationSchema
13-
implements SerializationSchema<Backup> {
12+
public class BackupSerializationSchema implements SerializationSchema<Backup> {
1413

1514
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
1615

1716
Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);
1817

1918
@Override
2019
public byte[] serialize(Backup backupMessage) {
21-
if(objectMapper == null) {
20+
if (objectMapper == null) {
2221
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
2322
objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
2423
}

libraries-data-2/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java renamed to apache-kafka/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,10 @@
88

99
import java.io.IOException;
1010

11-
public class InputMessageDeserializationSchema implements
12-
DeserializationSchema<InputMessage> {
11+
public class InputMessageDeserializationSchema implements DeserializationSchema<InputMessage> {
1312

1413
static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
1514

16-
1715
@Override
1816
public InputMessage deserialize(byte[] bytes) throws IOException {
1917

0 commit comments

Comments
 (0)