|
1 | 1 | package com.baeldung.kafkastreams; |
2 | 2 |
|
| 3 | +import java.io.IOException; |
| 4 | +import java.io.UncheckedIOException; |
| 5 | +import java.nio.file.Files; |
| 6 | +import java.nio.file.Path; |
3 | 7 | import java.util.Arrays; |
4 | 8 | import java.util.Properties; |
5 | 9 | import java.util.regex.Pattern; |
|
18 | 22 |
|
19 | 23 | public class KafkaStreamsLiveTest { |
20 | 24 | private String bootstrapServers = "localhost:9092"; |
| 25 | + private Path stateDirectory; |
21 | 26 |
|
22 | 27 | @Test |
23 | 28 | @Ignore("it needs to have kafka broker running on local") |
24 | 29 | public void shouldTestKafkaStreams() throws InterruptedException { |
25 | | - //given |
| 30 | + // given |
26 | 31 | String inputTopic = "inputTopic"; |
27 | 32 |
|
28 | 33 | Properties streamsConfiguration = new Properties(); |
29 | 34 | streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test"); |
30 | 35 | streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); |
31 | | - streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); |
32 | | - streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); |
| 36 | + streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String() |
| 37 | + .getClass() |
| 38 | + .getName()); |
| 39 | + streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String() |
| 40 | + .getClass() |
| 41 | + .getName()); |
33 | 42 | streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); |
34 | 43 | streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); |
35 | | - // Use a temporary directory for storing state, which will be automatically removed after the test. |
36 | | - // streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); |
37 | 44 |
|
38 | | - /* |
39 | | - * final StreamsBuilder builder = new StreamsBuilder(); |
40 | | - KStream<String, String> textLines = builder.stream(wordCountTopic, |
41 | | - Consumed.with(Serdes.String(), Serdes.String())); |
| 45 | + // Use a temporary directory for storing state, which will be automatically removed after the test. |
| 46 | + try { |
| 47 | + this.stateDirectory = Files.createTempDirectory("kafka-streams"); |
| 48 | + streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, this.stateDirectory.toAbsolutePath() |
| 49 | + .toString()); |
| 50 | + } catch (final IOException e) { |
| 51 | + throw new UncheckedIOException("Cannot create temporary directory", e); |
| 52 | + } |
42 | 53 |
|
43 | | - KTable<String, Long> wordCounts = textLines |
44 | | - .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT) |
45 | | - .split("\\W+"))) |
46 | | - .groupBy((key, word) -> word) |
47 | | - .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store")); |
48 | | - */ |
49 | | - //when |
| 54 | + // when |
50 | 55 | final StreamsBuilder builder = new StreamsBuilder(); |
51 | 56 | KStream<String, String> textLines = builder.stream(inputTopic); |
52 | 57 | Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); |
53 | 58 |
|
54 | | - KTable<String, Long> wordCounts = textLines |
55 | | - .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) |
56 | | - .groupBy((key, word) -> word) |
57 | | - .count(); |
| 59 | + KTable<String, Long> wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) |
| 60 | + .groupBy((key, word) -> word) |
| 61 | + .count(); |
58 | 62 |
|
59 | | - wordCounts.toStream().foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); |
| 63 | + wordCounts.toStream() |
| 64 | + .foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); |
60 | 65 |
|
61 | 66 | String outputTopic = "outputTopic"; |
62 | | - //final Serde<String> stringSerde = Serdes.String(); |
63 | | - //final Serde<Long> longSerde = Serdes.Long(); |
64 | | - //wordCounts.toStream().to(stringSerde, longSerde, outputTopic); |
65 | | - |
66 | | - wordCounts.toStream().to("outputTopic", |
67 | | - Produced.with(Serdes.String(), Serdes.Long())); |
| 67 | + |
| 68 | + wordCounts.toStream() |
| 69 | + .to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); |
68 | 70 |
|
69 | 71 | final Topology topology = builder.build(); |
70 | 72 | KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration); |
71 | 73 | streams.start(); |
72 | 74 |
|
73 | | - //then |
| 75 | + // then |
74 | 76 | Thread.sleep(30000); |
75 | 77 | streams.close(); |
76 | 78 | } |
|
0 commit comments