Skip to content

Commit 3245475

Browse files
committed
optimize redis connector
1 parent 8aa5011 commit 3245475

4 files changed

Lines changed: 93 additions & 13 deletions

File tree

sqlrec-common/src/main/java/com/sqlrec/common/config/SqlRecConfigs.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,4 +164,12 @@ public class SqlRecConfigs {
164164
null,
165165
Long.class
166166
);
167+
168+
public static final ConfigOption<Integer> REDIS_POOL_SIZE = new ConfigOption<>(
169+
"REDIS_POOL_SIZE",
170+
16,
171+
"redis connection pool size",
172+
null,
173+
Integer.class
174+
);
167175
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.sqlrec.common.utils;
2+
3+
import java.util.concurrent.atomic.AtomicLong;
4+
import java.util.function.Supplier;
5+
6+
public class SharePool<T> {
7+
private final Object[] objects;
8+
private final Supplier<T> supplier;
9+
private final AtomicLong counter;
10+
private final int size;
11+
12+
public SharePool(int size, Supplier<T> supplier) {
13+
if (size <= 0) {
14+
throw new IllegalArgumentException("Pool size must be positive");
15+
}
16+
if (supplier == null) {
17+
throw new IllegalArgumentException("Supplier cannot be null");
18+
}
19+
this.size = size;
20+
this.objects = new Object[size];
21+
this.supplier = supplier;
22+
this.counter = new AtomicLong(0);
23+
}
24+
25+
@SuppressWarnings("unchecked")
26+
public T getObject() {
27+
long index = counter.getAndIncrement();
28+
int arrayIndex = (int) (index % size);
29+
30+
if (objects[arrayIndex] == null) {
31+
synchronized (this) {
32+
if (objects[arrayIndex] == null) {
33+
objects[arrayIndex] = supplier.get();
34+
}
35+
}
36+
}
37+
38+
return (T) objects[arrayIndex];
39+
}
40+
41+
public int getSize() {
42+
return size;
43+
}
44+
45+
public int getCreatedCount() {
46+
int count = 0;
47+
for (Object obj : objects) {
48+
if (obj != null) {
49+
count++;
50+
}
51+
}
52+
return count;
53+
}
54+
}

sqlrec-connectors/sqlrec-connector-redis/src/main/java/com/sqlrec/connectors/redis/client/RedisClusterWrapper.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.sqlrec.connectors.redis.client;
22

3+
import com.sqlrec.common.config.SqlRecConfigs;
4+
import com.sqlrec.common.utils.SharePool;
35
import io.lettuce.core.KeyValue;
46
import io.lettuce.core.RedisFuture;
57
import io.lettuce.core.cluster.RedisClusterClient;
@@ -13,7 +15,7 @@
1315

1416
public class RedisClusterWrapper implements AbstractRedisWrapper {
1517
private static Map<String, RedisClusterClient> redisClientMap = new ConcurrentHashMap<>();
16-
private static Map<String, StatefulRedisClusterConnection<byte[], byte[]>> connectionMap = new ConcurrentHashMap<>();
18+
private static Map<String, SharePool<StatefulRedisClusterConnection<byte[], byte[]>>> poolMap = new ConcurrentHashMap<>();
1719

1820
private String url;
1921

@@ -23,22 +25,28 @@ public void open(String url) {
2325
}
2426

2527
private static synchronized void openRedisClusterClient(String url) {
26-
if (connectionMap.containsKey(url)) {
28+
if (poolMap.containsKey(url)) {
2729
return;
2830
}
2931

3032
RedisClusterClient redisClient = RedisClusterClient.create(url);
31-
StatefulRedisClusterConnection<byte[], byte[]> connection = redisClient.connect(new ByteArrayCodec());
33+
34+
SharePool<StatefulRedisClusterConnection<byte[], byte[]>> pool = new SharePool<>(
35+
SqlRecConfigs.REDIS_POOL_SIZE.getValue(),
36+
() -> redisClient.connect(new ByteArrayCodec())
37+
);
3238

3339
redisClientMap.put(url, redisClient);
34-
connectionMap.put(url, connection);
40+
poolMap.put(url, pool);
3541
}
3642

3743
private RedisAdvancedClusterAsyncCommands<byte[], byte[]> getCommands() {
38-
if (!connectionMap.containsKey(url)) {
44+
if (!poolMap.containsKey(url)) {
3945
openRedisClusterClient(url);
4046
}
41-
return connectionMap.get(url).async();
47+
SharePool<StatefulRedisClusterConnection<byte[], byte[]>> pool = poolMap.get(url);
48+
StatefulRedisClusterConnection<byte[], byte[]> connection = pool.getObject();
49+
return connection.async();
4250
}
4351

4452
@Override

sqlrec-connectors/sqlrec-connector-redis/src/main/java/com/sqlrec/connectors/redis/client/RedisWrapper.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package com.sqlrec.connectors.redis.client;
22

3+
import com.sqlrec.common.config.SqlRecConfigs;
4+
import com.sqlrec.common.utils.SharePool;
35
import io.lettuce.core.KeyValue;
46
import io.lettuce.core.RedisClient;
57
import io.lettuce.core.RedisFuture;
8+
import io.lettuce.core.RedisURI;
69
import io.lettuce.core.api.StatefulRedisConnection;
710
import io.lettuce.core.api.async.RedisAsyncCommands;
811
import io.lettuce.core.codec.ByteArrayCodec;
@@ -13,7 +16,7 @@
1316

1417
public class RedisWrapper implements AbstractRedisWrapper {
1518
private static Map<String, RedisClient> redisClientMap = new ConcurrentHashMap<>();
16-
private static Map<String, StatefulRedisConnection<byte[], byte[]>> connectionMap = new ConcurrentHashMap<>();
19+
private static Map<String, SharePool<StatefulRedisConnection<byte[], byte[]>>> poolMap = new ConcurrentHashMap<>();
1720

1821
private String url;
1922

@@ -23,22 +26,29 @@ public void open(String url) {
2326
}
2427

2528
private static synchronized void openRedisClient(String url) {
26-
if (connectionMap.containsKey(url)) {
29+
if (poolMap.containsKey(url)) {
2730
return;
2831
}
2932

30-
RedisClient redisClient = RedisClient.create(url);
31-
StatefulRedisConnection<byte[], byte[]> connection = redisClient.connect(new ByteArrayCodec());
33+
RedisURI redisURI = RedisURI.create(url);
34+
RedisClient redisClient = RedisClient.create(redisURI);
35+
36+
SharePool<StatefulRedisConnection<byte[], byte[]>> pool = new SharePool<>(
37+
SqlRecConfigs.REDIS_POOL_SIZE.getValue(),
38+
() -> redisClient.connect(new ByteArrayCodec())
39+
);
3240

3341
redisClientMap.put(url, redisClient);
34-
connectionMap.put(url, connection);
42+
poolMap.put(url, pool);
3543
}
3644

3745
private RedisAsyncCommands<byte[], byte[]> getCommands() {
38-
if (!connectionMap.containsKey(url)) {
46+
if (!poolMap.containsKey(url)) {
3947
openRedisClient(url);
4048
}
41-
return connectionMap.get(url).async();
49+
SharePool<StatefulRedisConnection<byte[], byte[]>> pool = poolMap.get(url);
50+
StatefulRedisConnection<byte[], byte[]> connection = pool.getObject();
51+
return connection.async();
4252
}
4353

4454
@Override

0 commit comments

Comments
 (0)