Skip to content

Commit 255642c

Browse files
Updating client to use GAX's exponential backoff
1 parent 2d54cc0 commit 255642c

5 files changed

Lines changed: 61 additions & 126 deletions

File tree

google-cloud-firestore/src/main/java/com/google/cloud/firestore/DocumentSet.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,9 +119,7 @@ DocumentSet remove(ResourcePath key) {
119119
return new DocumentSet(newKeyIndex, newSortedSet);
120120
}
121121

122-
/**
123-
* Returns a copy of the documents in this set as array. This is O(n) in the size of the set.
124-
*/
122+
/** Returns a copy of the documents in this set as array. This is O(n) in the size of the set. */
125123
List<DocumentSnapshot> toList() {
126124
List<DocumentSnapshot> documents = new ArrayList<>(size());
127125
for (DocumentSnapshot document : this) {
@@ -134,6 +132,7 @@ List<DocumentSnapshot> toList() {
134132
public Iterator<DocumentSnapshot> iterator() {
135133
return sortedSet.iterator();
136134
}
135+
137136
@Override
138137
public boolean equals(Object other) {
139138
if (this == other) {

google-cloud-firestore/src/main/java/com/google/cloud/firestore/ExponentialBackoff.java

Lines changed: 0 additions & 107 deletions
This file was deleted.

google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616

1717
package com.google.cloud.firestore;
1818

19+
import com.google.api.core.InternalApi;
1920
import com.google.api.gax.rpc.ApiException;
2021
import com.google.cloud.grpc.BaseGrpcServiceException;
21-
import com.google.common.annotations.VisibleForTesting;
2222
import io.grpc.Status;
2323
import java.io.IOException;
2424
import javax.annotation.Nullable;
@@ -79,7 +79,7 @@ static FirestoreException apiException(ApiException exception) {
7979
return new FirestoreException(exception);
8080
}
8181

82-
@VisibleForTesting
82+
@InternalApi
8383
@Nullable
8484
Status getStatus() {
8585
return status;

google-cloud-firestore/src/main/java/com/google/cloud/firestore/QuerySnapshot.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,7 @@ public boolean equals(Object o) {
150150
return false;
151151
}
152152
QuerySnapshot that = (QuerySnapshot) o;
153-
return Objects.equals(query, that.query)
154-
&& Objects.equals(readTime, that.readTime);
153+
return Objects.equals(query, that.query) && Objects.equals(readTime, that.readTime);
155154
}
156155

157156
@Override

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Watch.java

Lines changed: 56 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@
1616

1717
package com.google.cloud.firestore;
1818

19+
import com.google.api.core.CurrentMillisClock;
1920
import com.google.api.gax.grpc.GrpcStatusCode;
21+
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
22+
import com.google.api.gax.retrying.RetrySettings;
23+
import com.google.api.gax.retrying.TimedAttemptSettings;
2024
import com.google.api.gax.rpc.ApiException;
2125
import com.google.api.gax.rpc.ApiStreamObserver;
2226
import com.google.cloud.firestore.DocumentChange.Type;
@@ -41,15 +45,18 @@
4145
import java.util.Map;
4246
import java.util.Map.Entry;
4347
import java.util.concurrent.Executor;
48+
import java.util.concurrent.ScheduledExecutorService;
49+
import java.util.concurrent.TimeUnit;
4450
import java.util.concurrent.atomic.AtomicBoolean;
4551
import javax.annotation.Nullable;
52+
import org.threeten.bp.Duration;
4653
import org.threeten.bp.Instant;
4754

4855
/**
49-
* Watch provides listen functionality and exposes snapshot listeners. It can be used with any
50-
* valid Firestore Listen target.
56+
* Watch provides listen functionality and exposes snapshot listeners. It can be used with any valid
57+
* Firestore Listen target.
5158
*
52-
* This class is thread-compatible when called through the methods defined in ApiStreamObserver.
59+
* <p>This class is thread-compatible when called through the methods defined in ApiStreamObserver.
5360
* It synchronizes on its own instance so it is advisable not to use this class for external
5461
* synchronization.
5562
*/
@@ -60,10 +67,35 @@ class Watch implements ApiStreamObserver<ListenResponse> {
6067
*/
6168
private static final int WATCH_TARGET_ID = 0xD0;
6269

70+
private static int RETRY_INITIAL_DELAY_MS = 1000;
71+
72+
/** The maximum backoff time in milliseconds. */
73+
private static int RETRY_MAX_DELAY_MS = 60 * 1000;
74+
75+
/** The factor to increase the backup by after each failed attempt. */
76+
private static double RETRY_BACKOFF_FACTOR = 1.5;
77+
78+
private static RetrySettings RETRY_SETTINGS =
79+
RetrySettings.newBuilder()
80+
.
81+
// The initial backoff time in seconds after an error.
82+
// Set to 1s according to https://cloud.google.com/apis/design/errors.
83+
setInitialRetryDelay(Duration.ofSeconds(1))
84+
.
85+
// The maximum backoff time in minutes.
86+
setMaxRetryDelay(Duration.ofMinutes(1))
87+
.
88+
// The factor to increase the backup by after each failed attempt.
89+
setRetryDelayMultiplier(1.5)
90+
.setJittered(true)
91+
.build();
92+
6393
private final FirestoreImpl firestore;
94+
private final ScheduledExecutorService firestoreExecutor;
6495
private final Comparator<DocumentSnapshot> comparator;
65-
private final ExponentialBackoff backoff;
96+
private final ExponentialRetryAlgorithm backoff;
6697
private final Target target;
98+
private TimedAttemptSettings nextAttempt;
6799
private ApiStreamObserver<ListenRequest> stream;
68100

69101
/** The sorted tree of DocumentSnapshots as sent in the last snapshot. */
@@ -94,8 +126,8 @@ class Watch implements ApiStreamObserver<ListenResponse> {
94126
private boolean hasPushed;
95127

96128
/**
97-
* Indicates whether we are interested in data from the stream. Set to false
98-
* in the 'unsubscribe()' callback.
129+
* Indicates whether we are interested in data from the stream. Set to false in the
130+
* 'unsubscribe()' callback.
99131
*/
100132
private AtomicBoolean isActive;
101133

@@ -116,8 +148,11 @@ private Watch(FirestoreImpl firestore, Target target, Comparator<DocumentSnapsho
116148
this.firestore = firestore;
117149
this.target = target;
118150
this.comparator = comparator;
119-
this.backoff = new ExponentialBackoff(firestore.getClient().getExecutor());
151+
this.backoff =
152+
new ExponentialRetryAlgorithm(RETRY_SETTINGS, CurrentMillisClock.getDefaultClock());
153+
this.firestoreExecutor = firestore.getClient().getExecutor();
120154
this.isActive = new AtomicBoolean();
155+
this.nextAttempt = backoff.createFirstAttempt();
121156
}
122157

123158
/**
@@ -197,12 +232,14 @@ public synchronized void onNext(ListenResponse listenResponse) {
197232
resetDocs();
198233
break;
199234
default:
200-
closeStream(FirestoreException.invalidState("Encountered invalid target change type: " + change.getTargetChangeType()));
235+
closeStream(
236+
FirestoreException.invalidState(
237+
"Encountered invalid target change type: " + change.getTargetChangeType()));
201238
}
202239

203240
if (change.getResumeToken() != null
204241
&& affectsTarget(change.getTargetIdsList(), WATCH_TARGET_ID)) {
205-
backoff.reset();
242+
nextAttempt = backoff.createFirstAttempt();
206243
}
207244

208245
break;
@@ -348,7 +385,7 @@ public void run() {
348385
private void maybeReopenStream(Throwable throwable) {
349386
if (isActive.get() && !isPermanentError(throwable)) {
350387
if (isResourceExhaustedError(throwable)) {
351-
backoff.resetToMax();
388+
nextAttempt = backoff.createNextAttempt(nextAttempt);
352389
}
353390

354391
changeMap.clear();
@@ -370,7 +407,7 @@ private void resetStream() {
370407

371408
/** Initializes a new stream to the backend with backoff. */
372409
private void initStream() {
373-
backoff.backoffAndRun(
410+
firestoreExecutor.schedule(
374411
new Runnable() {
375412
@Override
376413
public void run() {
@@ -379,10 +416,15 @@ public void run() {
379416
}
380417

381418
synchronized (Watch.this) {
419+
if (!isActive.get()) {
420+
return;
421+
}
422+
382423
Preconditions.checkState(stream == null);
383424

384425
current = false;
385426
hasPushed = false;
427+
nextAttempt = backoff.createNextAttempt(nextAttempt);
386428

387429
stream = firestore.streamRequest(Watch.this, firestore.getClient().listenCallable());
388430

@@ -396,7 +438,9 @@ public void run() {
396438
stream.onNext(request.build());
397439
}
398440
}
399-
});
441+
},
442+
nextAttempt.getRandomizedRetryDelay().toMillis(),
443+
TimeUnit.MILLISECONDS);
400444
}
401445

402446
/**

0 commit comments

Comments
 (0)