Skip to content

Commit 50df8ec

Browse files
committed
optimize milvus connector
1 parent 6dc0c1a commit 50df8ec

File tree

3 files changed

+12
-4
lines changed

3 files changed

+12
-4
lines changed

sqlrec-connectors/sqlrec-connector-milvus/src/main/java/com/sqlrec/connectors/milvus/calcite/MilvusCalciteTableFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import org.apache.calcite.schema.Table;
99

1010
import java.util.ArrayList;
11-
import java.util.Collections;
1211
import java.util.List;
1312
import java.util.Map;
1413

sqlrec-connectors/sqlrec-connector-milvus/src/main/java/com/sqlrec/connectors/milvus/flink/MilvusDynamicTableFactory.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ private MilvusConfig getMilvusConfig(Context context) {
2828
Map<String, String> options = context.getCatalogTable().getOptions();
2929
ResolvedSchema tableSchema = context.getCatalogTable().getResolvedSchema();
3030
MilvusConfig milvusConfig = MilvusOptions.getMilvusConfig(options);
31-
milvusConfig.database = context.getObjectIdentifier().getDatabaseName();
3231
milvusConfig.fieldSchemas = FlinkSchemaUtils.getFieldSchemas(tableSchema);
3332
milvusConfig.primaryKey = FlinkSchemaUtils.getPrimaryKey(tableSchema);
3433
milvusConfig.primaryKeyIndex = HiveTableUtils.getTablePrimaryKeyIndex(

sqlrec-connectors/sqlrec-connector-milvus/src/main/java/com/sqlrec/connectors/milvus/handler/MilvusHandler.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
import io.milvus.v2.service.vector.response.SearchResp;
2121
import org.apache.calcite.DataContext;
2222
import org.apache.calcite.rex.RexNode;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
2325

2426
import java.time.Duration;
2527
import java.util.*;
2628
import java.util.concurrent.ConcurrentHashMap;
2729
import java.util.stream.Collectors;
2830

2931
public class MilvusHandler {
32+
private static final Logger logger = LoggerFactory.getLogger(MilvusHandler.class);
3033
private static Map<String, MilvusClientV2Pool> clientPools = new ConcurrentHashMap<>();
3134
private static Gson gson = new GsonBuilder()
3235
.setObjectToNumberStrategy(ToNumberPolicy.LONG_OR_DOUBLE)
@@ -236,20 +239,27 @@ private Object[] toRowWithScore(Map<String, Object> entity, List<FieldSchema> fi
236239
}
237240

238241
public static void returnClient(MilvusClientV2 client, MilvusConfig milvusConfig) {
239-
String key = milvusConfig.url + milvusConfig.token;
242+
String key = getClientPoolKey(milvusConfig);
240243
if (clientPools.containsKey(key)) {
241244
clientPools.get(key).returnClient(key, client);
245+
} else {
246+
logger.warn("client pool {} is not found", key);
247+
client.close();
242248
}
243249
}
244250

245251
public static MilvusClientV2 getClient(MilvusConfig milvusConfig) {
246-
String key = milvusConfig.url + milvusConfig.token;
252+
String key = getClientPoolKey(milvusConfig);
247253
if (!clientPools.containsKey(key)) {
248254
openClientPool(key, milvusConfig);
249255
}
250256
return clientPools.get(key).getClient(key);
251257
}
252258

259+
private static String getClientPoolKey(MilvusConfig milvusConfig) {
260+
return milvusConfig.url + "|" + milvusConfig.token;
261+
}
262+
253263
private static synchronized void openClientPool(String key, MilvusConfig milvusConfig) {
254264
if (clientPools.containsKey(key)) {
255265
return;

0 commit comments

Comments
 (0)