Skip to content

Commit 25d3d89

Browse files
committed
optimize code
1 parent d213e4a commit 25d3d89

6 files changed

Lines changed: 54 additions & 16 deletions

File tree

sqlrec-core/src/main/java/com/sqlrec/utils/DbUtils.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ private static SqlSessionFactory createSqlSessionFactory() {
3535
dataSource.setUrl(SqlRecConfigs.DB_URL.getValue());
3636
dataSource.setUsername(SqlRecConfigs.DB_USER.getValue());
3737
dataSource.setPassword(SqlRecConfigs.DB_PASSWORD.getValue());
38+
dataSource.setPoolMaximumActiveConnections(20);
39+
dataSource.setPoolMaximumIdleConnections(10);
40+
dataSource.setPoolMaximumCheckoutTime(20000);
41+
dataSource.setPoolTimeToWait(20000);
42+
dataSource.setPoolPingEnabled(true);
43+
dataSource.setPoolPingQuery("SELECT 1");
44+
dataSource.setPoolPingConnectionsNotUsedFor(3600000);
3845

3946
TransactionFactory transactionFactory = new JdbcTransactionFactory();
4047
Environment environment = new Environment(

sqlrec-core/src/main/java/com/sqlrec/utils/HadoopUtils.java

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,31 +7,34 @@
77
import java.io.BufferedReader;
88
import java.io.InputStreamReader;
99
import java.util.Map;
10+
import java.util.concurrent.TimeUnit;
1011

1112
public class HadoopUtils {
1213
private static final Logger log = LoggerFactory.getLogger(HadoopUtils.class);
14+
private static final long DEFAULT_TIMEOUT_SECONDS = 300;
1315

1416
public static boolean pathExists(String hdfsPath) {
15-
if (hdfsPath == null || StringUtils.containsWhitespace(hdfsPath)) {
16-
throw new IllegalArgumentException("hdfsPath cannot be blank");
17-
}
17+
validatePath(hdfsPath);
1818

1919
try {
2020
ProcessBuilder pb = new ProcessBuilder("hadoop", "fs", "-test", "-e", hdfsPath);
2121
pb.redirectErrorStream(true);
2222
clearJavaToolOptions(pb.environment());
2323
Process process = pb.start();
24-
int exitCode = process.waitFor();
24+
boolean finished = process.waitFor(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
25+
if (!finished) {
26+
process.destroyForcibly();
27+
throw new RuntimeException("Check hdfs path exists timeout: path=" + hdfsPath);
28+
}
29+
int exitCode = process.exitValue();
2530
return exitCode == 0;
2631
} catch (Exception e) {
2732
throw new RuntimeException("Check hdfs path exists failed: path=" + hdfsPath, e);
2833
}
2934
}
3035

3136
public static void deletePath(String hdfsPath) {
32-
if (StringUtils.isEmpty(hdfsPath) || StringUtils.containsWhitespace(hdfsPath)) {
33-
throw new IllegalArgumentException("hdfsPath cannot be blank");
34-
}
37+
validatePath(hdfsPath);
3538

3639
try {
3740
ProcessBuilder pb = new ProcessBuilder("hadoop", "fs", "-rm", "-r", "-f", hdfsPath);
@@ -47,7 +50,12 @@ public static void deletePath(String hdfsPath) {
4750
}
4851
}
4952

50-
int exitCode = process.waitFor();
53+
boolean finished = process.waitFor(DEFAULT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
54+
if (!finished) {
55+
process.destroyForcibly();
56+
throw new RuntimeException("Delete hdfs path timeout: path=" + hdfsPath);
57+
}
58+
int exitCode = process.exitValue();
5159
if (exitCode != 0) {
5260
log.warn("Delete hdfs path failed: path={}, exitCode={}, output={}", hdfsPath, exitCode, output);
5361
throw new RuntimeException("Delete hdfs path failed: path=" + hdfsPath + " output=" + output);
@@ -58,6 +66,18 @@ public static void deletePath(String hdfsPath) {
5866
}
5967
}
6068

69+
private static void validatePath(String hdfsPath) {
70+
if (hdfsPath == null || StringUtils.containsWhitespace(hdfsPath)) {
71+
throw new IllegalArgumentException("hdfsPath cannot be blank");
72+
}
73+
74+
if (hdfsPath.contains(";") || hdfsPath.contains("|") || hdfsPath.contains("&") ||
75+
hdfsPath.contains("`") || hdfsPath.contains("$") || hdfsPath.contains("(") ||
76+
hdfsPath.contains(")") || hdfsPath.contains("<") || hdfsPath.contains(">")) {
77+
throw new IllegalArgumentException("hdfsPath contains invalid characters: " + hdfsPath);
78+
}
79+
}
80+
6181
private static void clearJavaToolOptions(Map<String, String> environment) {
6282
environment.remove("JAVA_TOOL_OPTIONS");
6383
}

sqlrec-core/src/main/java/com/sqlrec/utils/JavaFunctionUtils.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,7 @@ public static void registerTableFunction(String db, String funName, Class<?> cla
9191

9292
public static long getFunctionUpdateTime(String db, String funName) {
9393
String mapKey = getMapKey(db, funName);
94-
if (functionUpdateTime.containsKey(mapKey)) {
95-
return functionUpdateTime.get(mapKey);
96-
} else {
97-
return 0;
98-
}
94+
return functionUpdateTime.getOrDefault(mapKey, 0L);
9995
}
10096

10197
private static String getMapKey(String db, String funName) {

sqlrec-core/src/main/java/com/sqlrec/utils/KvJoinUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ private static Enumerable joinByPrimaryKey(
5555
Set<Object> joinKeys = new HashSet<>();
5656
for (Object[] leftValue : leftValues) {
5757
Object leftJoinKey = leftValue[leftJoinKeyColIndex];
58+
if (leftJoinKey == null) {
59+
continue;
60+
}
5861
joinKeys.add(leftJoinKey);
5962
}
6063

@@ -67,6 +70,9 @@ private static Enumerable joinByPrimaryKey(
6770
List<List<Object[]>> rowList = new ArrayList<>();
6871
for (Object[] leftValue : leftValues) {
6972
Object leftJoinKey = leftValue[leftJoinKeyColIndex];
73+
if (leftJoinKey == null) {
74+
continue;
75+
}
7076
List<Object[]> rightValues = stringKeyMap.getOrDefault(leftJoinKey.toString(), new ArrayList<>());
7177
if (rightValues.isEmpty()) {
7278
if (joinType == JoinRelType.LEFT) {

sqlrec-core/src/main/java/com/sqlrec/utils/SchemaUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ public static String removeQuotes(String value) {
4646
if (value == null) {
4747
return null;
4848
}
49-
if (value.startsWith("'") && value.endsWith("'")) {
49+
if ((value.startsWith("'") && value.endsWith("'")) ||
50+
(value.startsWith("\"") && value.endsWith("\""))) {
5051
return value.substring(1, value.length() - 1);
5152
}
5253
return value;

sqlrec-core/src/main/java/com/sqlrec/utils/VectorJoinUtils.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,14 @@ private static Enumerable doVectorJoin(
9393

9494
List<Object[]> result = new ArrayList<>();
9595
for (Object[] leftValue : leftValues) {
96-
Object leftEmbedding = leftValue[leftEmbeddingColIndex];
96+
if (leftValue == null) {
97+
continue;
98+
}
99+
Object leftEmbedding = leftEmbeddingColIndex >= 0 && leftEmbeddingColIndex < leftValue.length ?
100+
leftValue[leftEmbeddingColIndex] : null;
101+
if (leftEmbedding == null) {
102+
continue;
103+
}
97104
List<Float> embedding = DataTransformUtils.convertToFloatVec(leftEmbedding);
98105
List<Object[]> rightValues = rightTable.searchByEmbeddingWithScore(
99106
leftValue,
@@ -137,7 +144,8 @@ public static VectorJoinConfig extractVectorJoinConfig(
137144
VectorJoinConfig config = new VectorJoinConfig();
138145

139146
if (sort != null && sort.fetch != null && sort.fetch instanceof RexLiteral) {
140-
config.limit = ((RexLiteral) sort.fetch).getValueAs(Integer.class);
147+
Integer limitValue = ((RexLiteral) sort.fetch).getValueAs(Integer.class);
148+
config.limit = limitValue != null ? limitValue : 0;
141149
}
142150

143151
RelDataType leftRowType = join.getLeft().getRowType();

0 commit comments

Comments
 (0)