Skip to content

Commit fdc87ea

Browse files
committed
JAVA-6390: Move kafka articles from libraries-data-3 to new module
apache-kafka
1 parent f74f450 commit fdc87ea

File tree

4 files changed

+89
-295
lines changed

4 files changed

+89
-295
lines changed

libraries-data-3/src/main/java/com/baeldung/kafka/admin/KafkaTopicApplication.java renamed to apache-kafka/src/main/java/com/baeldung/kafka/admin/KafkaTopicApplication.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ public void createTopic(String topicName) throws Exception {
2727
short replicationFactor = 1;
2828
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
2929

30-
CreateTopicsResult result = admin.createTopics(
31-
Collections.singleton(newTopic));
30+
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
3231

3332
// get the async result for the new topic creation
34-
KafkaFuture<Void> future = result.values().get(topicName);
33+
KafkaFuture<Void> future = result.values()
34+
.get(topicName);
3535

3636
// call get() to block until topic creation has completed or failed
3737
future.get();
@@ -47,15 +47,13 @@ public void createTopicWithOptions(String topicName) throws Exception {
4747
short replicationFactor = 1;
4848
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor);
4949

50-
CreateTopicsOptions topicOptions = new CreateTopicsOptions()
51-
.validateOnly(true)
52-
.retryOnQuotaViolation(true);
50+
CreateTopicsOptions topicOptions = new CreateTopicsOptions().validateOnly(true)
51+
.retryOnQuotaViolation(true);
5352

54-
CreateTopicsResult result = admin.createTopics(
55-
Collections.singleton(newTopic), topicOptions
56-
);
53+
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic), topicOptions);
5754

58-
KafkaFuture<Void> future = result.values().get(topicName);
55+
KafkaFuture<Void> future = result.values()
56+
.get(topicName);
5957
future.get();
6058
}
6159
}
@@ -72,14 +70,12 @@ public void createCompactedTopicWithCompression(String topicName) throws Excepti
7270
Map<String, String> newTopicConfig = new HashMap<>();
7371
newTopicConfig.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT);
7472
newTopicConfig.put(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4");
75-
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor)
76-
.configs(newTopicConfig);
73+
NewTopic newTopic = new NewTopic(topicName, partitions, replicationFactor).configs(newTopicConfig);
7774

78-
CreateTopicsResult result = admin.createTopics(
79-
Collections.singleton(newTopic)
80-
);
75+
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
8176

82-
KafkaFuture<Void> future = result.values().get(topicName);
77+
KafkaFuture<Void> future = result.values()
78+
.get(topicName);
8379
future.get();
8480
}
8581
}

libraries-data-3/src/test/java/com/baeldung/kafka/admin/KafkaTopicApplicationIntegrationTest.java renamed to apache-kafka/src/test/java/com/baeldung/kafka/admin/KafkaTopicApplicationIntegrationTest.java

File renamed without changes.
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package com.baeldung.kafkastreams;
2+
3+
import java.util.Arrays;
4+
import java.util.Properties;
5+
import java.util.regex.Pattern;
6+
7+
import org.apache.kafka.clients.consumer.ConsumerConfig;
8+
import org.apache.kafka.common.serialization.Serdes;
9+
import org.apache.kafka.streams.KafkaStreams;
10+
import org.apache.kafka.streams.StreamsBuilder;
11+
import org.apache.kafka.streams.StreamsConfig;
12+
import org.apache.kafka.streams.Topology;
13+
import org.apache.kafka.streams.kstream.KStream;
14+
import org.apache.kafka.streams.kstream.KTable;
15+
import org.apache.kafka.streams.kstream.Produced;
16+
import org.junit.Ignore;
17+
import org.junit.Test;
18+
19+
public class KafkaStreamsLiveTest {
20+
private String bootstrapServers = "localhost:9092";
21+
22+
@Test
23+
@Ignore("it needs to have kafka broker running on local")
24+
public void shouldTestKafkaStreams() throws InterruptedException {
25+
//given
26+
String inputTopic = "inputTopic";
27+
28+
Properties streamsConfiguration = new Properties();
29+
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
30+
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
31+
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
32+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
33+
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
34+
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
35+
// Use a temporary directory for storing state, which will be automatically removed after the test.
36+
// streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
37+
38+
/*
39+
* final StreamsBuilder builder = new StreamsBuilder();
40+
KStream<String, String> textLines = builder.stream(wordCountTopic,
41+
Consumed.with(Serdes.String(), Serdes.String()));
42+
43+
KTable<String, Long> wordCounts = textLines
44+
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT)
45+
.split("\\W+")))
46+
.groupBy((key, word) -> word)
47+
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));
48+
*/
49+
//when
50+
final StreamsBuilder builder = new StreamsBuilder();
51+
KStream<String, String> textLines = builder.stream(inputTopic);
52+
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
53+
54+
KTable<String, Long> wordCounts = textLines
55+
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
56+
.groupBy((key, word) -> word)
57+
.count();
58+
59+
wordCounts.toStream().foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
60+
61+
String outputTopic = "outputTopic";
62+
//final Serde<String> stringSerde = Serdes.String();
63+
//final Serde<Long> longSerde = Serdes.Long();
64+
//wordCounts.toStream().to(stringSerde, longSerde, outputTopic);
65+
66+
wordCounts.toStream().to("outputTopic",
67+
Produced.with(Serdes.String(), Serdes.Long()));
68+
69+
final Topology topology = builder.build();
70+
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
71+
streams.start();
72+
73+
//then
74+
Thread.sleep(30000);
75+
streams.close();
76+
}
77+
}

0 commit comments

Comments
 (0)