Skip to content

Commit 6dc0c1a

Browse files
committed
optimize kafka connector
1 parent 807bcdb commit 6dc0c1a

File tree

1 file changed

+11
-5
lines changed
  • sqlrec-connectors/sqlrec-connector-kafka/src/main/java/com/sqlrec/connectors/kafka/calcite

1 file changed

+11
-5
lines changed

sqlrec-connectors/sqlrec-connector-kafka/src/main/java/com/sqlrec/connectors/kafka/calcite/KafkaCalciteTable.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,22 @@ public Expression getExpression(SchemaPlus schema, String tableName, Class clazz
6767
tableName, clazz);
6868
}
6969

70+
private static String getProducerConfigKey(KafkaConfig config) {
71+
return config.bootstrapServers + "|" + config.keySerializer + "|" + config.valueSerializer + "|" + config.lingerMs;
72+
}
73+
7074
public static KafkaProducer<String, String> getKafkaProducer(KafkaConfig kafkaConfig) {
71-
if (kafkaProducerMap.containsKey(kafkaConfig.bootstrapServers)) {
72-
return kafkaProducerMap.get(kafkaConfig.bootstrapServers);
75+
String configKey = getProducerConfigKey(kafkaConfig);
76+
if (kafkaProducerMap.containsKey(configKey)) {
77+
return kafkaProducerMap.get(configKey);
7378
}
7479
return openKafkaProducer(kafkaConfig);
7580
}
7681

7782
private static synchronized KafkaProducer<String, String> openKafkaProducer(KafkaConfig kafkaConfig) {
78-
if (kafkaProducerMap.containsKey(kafkaConfig.bootstrapServers)) {
79-
return kafkaProducerMap.get(kafkaConfig.bootstrapServers);
83+
String configKey = getProducerConfigKey(kafkaConfig);
84+
if (kafkaProducerMap.containsKey(configKey)) {
85+
return kafkaProducerMap.get(configKey);
8086
}
8187

8288
Properties props = new Properties();
@@ -86,7 +92,7 @@ private static synchronized KafkaProducer<String, String> openKafkaProducer(Kafk
8692
props.put("linger.ms", kafkaConfig.lingerMs);
8793

8894
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
89-
kafkaProducerMap.put(kafkaConfig.bootstrapServers, producer);
95+
kafkaProducerMap.put(configKey, producer);
9096
return producer;
9197
}
9298

0 commit comments

Comments
 (0)