Skip to content

Commit 807bcdb

Browse files
committed
optimize kafka connector
1 parent 557138a commit 807bcdb

File tree

3 files changed

+30
-3
lines changed

3 files changed

+30
-3
lines changed

sqlrec-connectors/sqlrec-connector-kafka/src/main/java/com/sqlrec/connectors/kafka/calcite/KafkaCalciteTable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,9 @@ private static synchronized KafkaProducer<String, String> openKafkaProducer(Kafk
8181

8282
Properties props = new Properties();
8383
props.put("bootstrap.servers", kafkaConfig.bootstrapServers);
84-
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
85-
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
86-
props.put("linger.ms", 5000); //todo read table config
84+
props.put("key.serializer", kafkaConfig.keySerializer);
85+
props.put("value.serializer", kafkaConfig.valueSerializer);
86+
props.put("linger.ms", kafkaConfig.lingerMs);
8787

8888
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
8989
kafkaProducerMap.put(kafkaConfig.bootstrapServers, producer);

sqlrec-connectors/sqlrec-connector-kafka/src/main/java/com/sqlrec/connectors/kafka/config/KafkaConfig.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,7 @@ public class KafkaConfig {
99
public String topic;
1010
public String format;
1111
public List<FieldSchema> fieldSchemas;
12+
public String keySerializer;
13+
public String valueSerializer;
14+
public int lingerMs;
1215
}

sqlrec-connectors/sqlrec-connector-kafka/src/main/java/com/sqlrec/connectors/kafka/config/KafkaOptions.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,37 @@ public class KafkaOptions {
2828
null,
2929
String.class
3030
);
31+
public static final ConfigOption<String> KEY_SERIALIZER = new ConfigOption<>(
32+
"properties.producer.key.serializer",
33+
"org.apache.kafka.common.serialization.StringSerializer",
34+
"Kafka producer key serializer",
35+
null,
36+
String.class
37+
);
38+
public static final ConfigOption<String> VALUE_SERIALIZER = new ConfigOption<>(
39+
"properties.producer.value.serializer",
40+
"org.apache.kafka.common.serialization.StringSerializer",
41+
"Kafka producer value serializer",
42+
null,
43+
String.class
44+
);
45+
public static final ConfigOption<Integer> LINGER_MS = new ConfigOption<>(
46+
"properties.producer.linger.ms",
47+
5000,
48+
"Kafka producer linger ms",
49+
null,
50+
Integer.class
51+
);
3152

3253

3354
public static KafkaConfig getKafkaConfig(Map<String, String> options) {
3455
KafkaConfig kafkaConfig = new KafkaConfig();
3556
kafkaConfig.bootstrapServers = BOOTSTRAP_SERVERS.getValue(options);
3657
kafkaConfig.topic = TOPIC.getValue(options);
3758
kafkaConfig.format = FORMAT.getValue(options);
59+
kafkaConfig.keySerializer = KEY_SERIALIZER.getValue(options);
60+
kafkaConfig.valueSerializer = VALUE_SERIALIZER.getValue(options);
61+
kafkaConfig.lingerMs = LINGER_MS.getValue(options);
3862

3963
return kafkaConfig;
4064
}

0 commit comments

Comments
 (0)