Skip to content

Commit 6e17eeb

Browse files
chore: implement leastlatency LB with Peak EWMA (#2869)
* chore: implement leastlatency LB with Peak EWMA Other changes - Hook up subset size settings - Increase PeakEwma decay - AFE latency is very noisy so we need a long duration to discern which AFEs genuinely perform better Change-Id: I138501487d4dec53aa80e18785023bcbfaa06807 * fix: handling default handling where subsetSize = 0 means the entire pool Change-Id: Ifb6c293e355d337d457d3b76efe4e21924f80f81 --------- Co-authored-by: Neil <[email protected]>
1 parent d5565b5 commit 6e17eeb

File tree

6 files changed

+120
-35
lines changed

6 files changed

+120
-35
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/DynamicPicker.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,12 @@ class DynamicPicker extends Picker {
3131
private final SessionList sessions;
3232

3333
private volatile Picker delegate;
34-
private LoadBalancingOptions.LoadBalancingStrategyCase currentStrategy;
34+
private volatile LoadBalancingOptions currentOptions;
3535

36-
public DynamicPicker(
37-
SessionList sessions, LoadBalancingOptions.LoadBalancingStrategyCase initialStrategy) {
36+
public DynamicPicker(SessionList sessions, LoadBalancingOptions initialOptions) {
3837
this.sessions = sessions;
39-
this.currentStrategy = initialStrategy;
40-
this.delegate = createPicker(initialStrategy);
38+
this.currentOptions = initialOptions;
39+
this.delegate = createPicker(initialOptions);
4140
}
4241

4342
@Override
@@ -46,25 +45,28 @@ public Optional<SessionHandle> pickSession() {
4645
}
4746

4847
public void updateConfig(SessionClientConfiguration.SessionPoolConfiguration config) {
49-
LoadBalancingOptions.LoadBalancingStrategyCase newStrategy =
50-
config.getLoadBalancingOptions().getLoadBalancingStrategyCase();
51-
if (newStrategy != currentStrategy) {
52-
delegate = createPicker(newStrategy);
53-
currentStrategy = newStrategy;
48+
LoadBalancingOptions newOptions = config.getLoadBalancingOptions();
49+
if (!newOptions.equals(currentOptions)) {
50+
delegate = createPicker(newOptions);
51+
currentOptions = newOptions;
5452
}
5553
}
5654

57-
private Picker createPicker(LoadBalancingOptions.LoadBalancingStrategyCase strategy) {
58-
switch (strategy) {
55+
private Picker createPicker(LoadBalancingOptions options) {
56+
switch (options.getLoadBalancingStrategyCase()) {
5957
case RANDOM:
60-
return new SimplePicker(sessions);
58+
return new SimplePicker(sessions, options.getRandom());
6159
case LEAST_IN_FLIGHT:
62-
return new LeastInFlightPicker(sessions);
60+
return new LeastInFlightPicker(sessions, options.getLeastInFlight());
61+
case PEAK_EWMA:
62+
return new LeastLatencyPicker(sessions, options.getPeakEwma());
6363
default:
6464
LOGGER.log(
65-
Level.FINE, "got load balancing strategy {0} which was not implemented", strategy);
66-
// TODO: implement PeakEwma
67-
return new LeastInFlightPicker(sessions);
65+
Level.FINE,
66+
"got load balancing strategy {0} which was not implemented",
67+
options.getLoadBalancingStrategyCase());
68+
return new LeastInFlightPicker(
69+
sessions, LoadBalancingOptions.LeastInFlight.getDefaultInstance());
6870
}
6971
}
7072
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/LeastInFlightPicker.java

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,40 +16,52 @@
1616

1717
package com.google.cloud.bigtable.data.v2.internal.session;
1818

19+
import com.google.bigtable.v2.LoadBalancingOptions;
1920
import com.google.cloud.bigtable.data.v2.internal.session.SessionList.AfeHandle;
2021
import com.google.cloud.bigtable.data.v2.internal.session.SessionList.SessionHandle;
22+
import java.util.ArrayList;
23+
import java.util.Collections;
2124
import java.util.List;
2225
import java.util.Optional;
2326
import java.util.concurrent.ThreadLocalRandom;
2427

25-
/** Pick the AFE with the fewest in-flight requests. Experimental for now. */
28+
/** Pick the AFE with the fewest in-flight requests. */
2629
class LeastInFlightPicker extends Picker {
2730
private final SessionList sessionList;
31+
private final LoadBalancingOptions.LeastInFlight options;
2832

29-
public LeastInFlightPicker(SessionList sessionList) {
33+
public LeastInFlightPicker(SessionList sessionList, LoadBalancingOptions.LeastInFlight options) {
3034
this.sessionList = sessionList;
35+
this.options = options;
3136
}
3237

3338
@Override
3439
Optional<SessionHandle> pickSession() {
3540
List<AfeHandle> readyAfes = sessionList.getAfesWithReadySessions();
36-
int size = readyAfes.size();
37-
38-
if (size == 0) {
41+
if (readyAfes.isEmpty()) {
3942
return Optional.empty();
4043
}
4144

42-
ThreadLocalRandom random = ThreadLocalRandom.current();
43-
AfeHandle selected = readyAfes.get(random.nextInt(size));
45+
ThreadLocalRandom rng = ThreadLocalRandom.current();
46+
List<AfeHandle> candidates = new ArrayList<>(readyAfes);
47+
int bestCost = Integer.MAX_VALUE;
48+
AfeHandle bestAfe = null;
49+
long iterations = readyAfes.size();
50+
if (options.getRandomSubsetSize() > 0) {
51+
iterations = Math.min(options.getRandomSubsetSize(), iterations);
52+
}
4453

45-
// If we have options, pick a second candidate and keep the better one
46-
if (size > 1) {
47-
AfeHandle candidate2 = readyAfes.get(random.nextInt(size));
48-
if (candidate2.getNumOutstanding() < selected.getNumOutstanding()) {
49-
selected = candidate2;
54+
// Partial Fisher-Yates shuffle.
55+
for (int i = 0; i < iterations; i++) {
56+
int randomIndex = i + rng.nextInt(candidates.size() - i);
57+
AfeHandle picked = candidates.get(randomIndex);
58+
if (picked.getNumOutstanding() < bestCost) {
59+
bestCost = picked.getNumOutstanding();
60+
bestAfe = picked;
5061
}
62+
// Move candidate to the `i`th entry so that it's not picked again.
63+
Collections.swap(candidates, i, randomIndex);
5164
}
52-
53-
return sessionList.checkoutSession(selected);
65+
return sessionList.checkoutSession(bestAfe);
5466
}
5567
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.bigtable.data.v2.internal.session;
18+
19+
import com.google.bigtable.v2.LoadBalancingOptions;
20+
import com.google.cloud.bigtable.data.v2.internal.session.SessionList.AfeHandle;
21+
import com.google.cloud.bigtable.data.v2.internal.session.SessionList.SessionHandle;
22+
import java.util.ArrayList;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import java.util.Optional;
26+
import java.util.concurrent.ThreadLocalRandom;
27+
28+
/** Pick the AFE with the least latency. Experimental for now. */
29+
class LeastLatencyPicker extends Picker {
30+
private final SessionList sessionList;
31+
private final LoadBalancingOptions.PeakEwma options;
32+
33+
public LeastLatencyPicker(SessionList sessionList, LoadBalancingOptions.PeakEwma options) {
34+
this.sessionList = sessionList;
35+
this.options = options;
36+
}
37+
38+
@Override
39+
Optional<SessionHandle> pickSession() {
40+
List<AfeHandle> readyAfes = sessionList.getAfesWithReadySessions();
41+
if (readyAfes.isEmpty()) {
42+
return Optional.empty();
43+
}
44+
45+
ThreadLocalRandom rng = ThreadLocalRandom.current();
46+
List<AfeHandle> candidates = new ArrayList<>(readyAfes);
47+
double bestCost = Double.MAX_VALUE;
48+
AfeHandle bestAfe = null;
49+
long iterations = readyAfes.size();
50+
51+
if (options.getRandomSubsetSize() > 0) {
52+
iterations = Math.min(options.getRandomSubsetSize(), iterations);
53+
}
54+
55+
// Partial Fisher-Yates shuffle.
56+
for (int i = 0; i < iterations; i++) {
57+
int randomIndex = i + rng.nextInt(candidates.size() - i);
58+
AfeHandle picked = candidates.get(randomIndex);
59+
if (picked.getE2eCost() < bestCost) {
60+
bestCost = picked.getE2eCost();
61+
bestAfe = picked;
62+
}
63+
// Move candidate to the `i`th entry so that it's not picked again.
64+
Collections.swap(candidates, i, randomIndex);
65+
}
66+
return sessionList.checkoutSession(bestAfe);
67+
}
68+
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionList.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,8 @@ int getNumOutstanding() {
413413
}
414414

415415
static class PeakEwma {
416-
// Use the last 100ms as a look back window
417-
private final double decayNs = TimeUnit.MILLISECONDS.toNanos(100);
416+
// Use the last 10s as a look back window
417+
private final double decayNs = TimeUnit.SECONDS.toNanos(10);
418418
private long timestamp = System.nanoTime();
419419
private double cost;
420420

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SessionPoolImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public SessionPoolImpl(
192192
.getSessionConfiguration()
193193
.getSessionPoolConfiguration()
194194
.getLoadBalancingOptions();
195-
picker = new DynamicPicker(sessions, lbOptions.getLoadBalancingStrategyCase());
195+
picker = new DynamicPicker(sessions, lbOptions);
196196
poolSizer =
197197
new PoolSizer(
198198
sessions.getStats(),

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/internal/session/SimplePicker.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.bigtable.data.v2.internal.session;
1818

19+
import com.google.bigtable.v2.LoadBalancingOptions;
1920
import com.google.cloud.bigtable.data.v2.internal.session.SessionList.AfeHandle;
2021
import com.google.cloud.bigtable.data.v2.internal.session.SessionList.SessionHandle;
2122
import java.util.List;
@@ -24,10 +25,12 @@
2425

2526
class SimplePicker extends Picker {
2627
private final SessionList sessionList;
28+
private final LoadBalancingOptions.Random options;
2729
private final Random random = new Random();
2830

29-
public SimplePicker(SessionList sessionList) {
31+
public SimplePicker(SessionList sessionList, LoadBalancingOptions.Random options) {
3032
this.sessionList = sessionList;
33+
this.options = options;
3134
}
3235

3336
@Override

0 commit comments

Comments
 (0)