Skip to content

Commit 96f9657

Browse files
committed
remove empty module
1 parent 6e46b83 commit 96f9657

3 files changed

Lines changed: 30 additions & 58 deletions

File tree

apache-kafka/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
package com.baeldung.kafkastreams;
22

3+
import java.io.IOException;
4+
import java.io.UncheckedIOException;
5+
import java.nio.file.Files;
6+
import java.nio.file.Path;
37
import java.util.Arrays;
48
import java.util.Properties;
59
import java.util.regex.Pattern;
@@ -18,59 +22,57 @@
1822

1923
public class KafkaStreamsLiveTest {
2024
private String bootstrapServers = "localhost:9092";
25+
private Path stateDirectory;
2126

2227
@Test
2328
@Ignore("it needs to have kafka broker running on local")
2429
public void shouldTestKafkaStreams() throws InterruptedException {
25-
//given
30+
// given
2631
String inputTopic = "inputTopic";
2732

2833
Properties streamsConfiguration = new Properties();
2934
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
3035
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
31-
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
32-
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
36+
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String()
37+
.getClass()
38+
.getName());
39+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String()
40+
.getClass()
41+
.getName());
3342
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
3443
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
35-
// Use a temporary directory for storing state, which will be automatically removed after the test.
36-
// streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
3744

38-
/*
39-
* final StreamsBuilder builder = new StreamsBuilder();
40-
KStream<String, String> textLines = builder.stream(wordCountTopic,
41-
Consumed.with(Serdes.String(), Serdes.String()));
45+
// Use a temporary directory for storing state, which will be automatically removed after the test.
46+
try {
47+
this.stateDirectory = Files.createTempDirectory("kafka-streams");
48+
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath()
49+
.toString());
50+
} catch (final IOException e) {
51+
throw new UncheckedIOException("Cannot create temporary directory", e);
52+
}
4253

43-
KTable<String, Long> wordCounts = textLines
44-
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT)
45-
.split("\\W+")))
46-
.groupBy((key, word) -> word)
47-
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));
48-
*/
49-
//when
54+
// when
5055
final StreamsBuilder builder = new StreamsBuilder();
5156
KStream<String, String> textLines = builder.stream(inputTopic);
5257
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
5358

54-
KTable<String, Long> wordCounts = textLines
55-
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
56-
.groupBy((key, word) -> word)
57-
.count();
59+
KTable<String, Long> wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
60+
.groupBy((key, word) -> word)
61+
.count();
5862

59-
wordCounts.toStream().foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
63+
wordCounts.toStream()
64+
.foreach((word, count) -> System.out.println("word: " + word + " -> " + count));
6065

6166
String outputTopic = "outputTopic";
62-
//final Serde<String> stringSerde = Serdes.String();
63-
//final Serde<Long> longSerde = Serdes.Long();
64-
//wordCounts.toStream().to(stringSerde, longSerde, outputTopic);
65-
66-
wordCounts.toStream().to("outputTopic",
67-
Produced.with(Serdes.String(), Serdes.Long()));
67+
68+
wordCounts.toStream()
69+
.to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
6870

6971
final Topology topology = builder.build();
7072
KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
7173
streams.start();
7274

73-
//then
75+
// then
7476
Thread.sleep(30000);
7577
streams.close();
7678
}

libraries-data-3/README.md

Lines changed: 0 additions & 9 deletions
This file was deleted.

libraries-data-3/pom.xml

Lines changed: 0 additions & 21 deletions
This file was deleted.

0 commit comments

Comments
 (0)