1616
1717package com .google .cloud .firestore ;
1818
19+ import com .google .api .core .CurrentMillisClock ;
1920import com .google .api .gax .grpc .GrpcStatusCode ;
21+ import com .google .api .gax .retrying .ExponentialRetryAlgorithm ;
22+ import com .google .api .gax .retrying .RetrySettings ;
23+ import com .google .api .gax .retrying .TimedAttemptSettings ;
2024import com .google .api .gax .rpc .ApiException ;
2125import com .google .api .gax .rpc .ApiStreamObserver ;
2226import com .google .cloud .firestore .DocumentChange .Type ;
4145import java .util .Map ;
4246import java .util .Map .Entry ;
4347import java .util .concurrent .Executor ;
48+ import java .util .concurrent .ScheduledExecutorService ;
49+ import java .util .concurrent .TimeUnit ;
4450import java .util .concurrent .atomic .AtomicBoolean ;
4551import javax .annotation .Nullable ;
52+ import org .threeten .bp .Duration ;
4653import org .threeten .bp .Instant ;
4754
4855/**
49- * Watch provides listen functionality and exposes snapshot listeners. It can be used with any
50- * valid Firestore Listen target.
56+ * Watch provides listen functionality and exposes snapshot listeners. It can be used with any valid
57+ * Firestore Listen target.
5158 *
52- * This class is thread-compatible when called through the methods defined in ApiStreamObserver.
59+ * <p> This class is thread-compatible when called through the methods defined in ApiStreamObserver.
5360 * It synchronizes on its own instance so it is advisable not to use this class for external
5461 * synchronization.
5562 */
@@ -60,10 +67,35 @@ class Watch implements ApiStreamObserver<ListenResponse> {
6067 */
6168 private static final int WATCH_TARGET_ID = 0xD0 ;
6269
70+ private static int RETRY_INITIAL_DELAY_MS = 1000 ;
71+
72+ /** The maximum backoff time in milliseconds. */
73+ private static int RETRY_MAX_DELAY_MS = 60 * 1000 ;
74+
75+ /** The factor to increase the backup by after each failed attempt. */
76+ private static double RETRY_BACKOFF_FACTOR = 1.5 ;
77+
78+ private static RetrySettings RETRY_SETTINGS =
79+ RetrySettings .newBuilder ()
80+ .
81+ // The initial backoff time in seconds after an error.
82+ // Set to 1s according to https://cloud.google.com/apis/design/errors.
83+ setInitialRetryDelay (Duration .ofSeconds (1 ))
84+ .
85+ // The maximum backoff time in minutes.
86+ setMaxRetryDelay (Duration .ofMinutes (1 ))
87+ .
88+ // The factor to increase the backup by after each failed attempt.
89+ setRetryDelayMultiplier (1.5 )
90+ .setJittered (true )
91+ .build ();
92+
6393 private final FirestoreImpl firestore ;
94+ private final ScheduledExecutorService firestoreExecutor ;
6495 private final Comparator <DocumentSnapshot > comparator ;
65- private final ExponentialBackoff backoff ;
96+ private final ExponentialRetryAlgorithm backoff ;
6697 private final Target target ;
98+ private TimedAttemptSettings nextAttempt ;
6799 private ApiStreamObserver <ListenRequest > stream ;
68100
69101 /** The sorted tree of DocumentSnapshots as sent in the last snapshot. */
@@ -94,8 +126,8 @@ class Watch implements ApiStreamObserver<ListenResponse> {
94126 private boolean hasPushed ;
95127
96128 /**
97- * Indicates whether we are interested in data from the stream. Set to false
98- * in the 'unsubscribe()' callback.
129+ * Indicates whether we are interested in data from the stream. Set to false in the
130+ * 'unsubscribe()' callback.
99131 */
100132 private AtomicBoolean isActive ;
101133
@@ -116,8 +148,11 @@ private Watch(FirestoreImpl firestore, Target target, Comparator<DocumentSnapsho
116148 this .firestore = firestore ;
117149 this .target = target ;
118150 this .comparator = comparator ;
119- this .backoff = new ExponentialBackoff (firestore .getClient ().getExecutor ());
151+ this .backoff =
152+ new ExponentialRetryAlgorithm (RETRY_SETTINGS , CurrentMillisClock .getDefaultClock ());
153+ this .firestoreExecutor = firestore .getClient ().getExecutor ();
120154 this .isActive = new AtomicBoolean ();
155+ this .nextAttempt = backoff .createFirstAttempt ();
121156 }
122157
123158 /**
@@ -197,12 +232,14 @@ public synchronized void onNext(ListenResponse listenResponse) {
197232 resetDocs ();
198233 break ;
199234 default :
200- closeStream (FirestoreException .invalidState ("Encountered invalid target change type: " + change .getTargetChangeType ()));
235+ closeStream (
236+ FirestoreException .invalidState (
237+ "Encountered invalid target change type: " + change .getTargetChangeType ()));
201238 }
202239
203240 if (change .getResumeToken () != null
204241 && affectsTarget (change .getTargetIdsList (), WATCH_TARGET_ID )) {
205- backoff .reset ();
242+ nextAttempt = backoff .createFirstAttempt ();
206243 }
207244
208245 break ;
@@ -348,7 +385,7 @@ public void run() {
348385 private void maybeReopenStream (Throwable throwable ) {
349386 if (isActive .get () && !isPermanentError (throwable )) {
350387 if (isResourceExhaustedError (throwable )) {
351- backoff .resetToMax ( );
388+ nextAttempt = backoff .createNextAttempt ( nextAttempt );
352389 }
353390
354391 changeMap .clear ();
@@ -370,7 +407,7 @@ private void resetStream() {
370407
371408 /** Initializes a new stream to the backend with backoff. */
372409 private void initStream () {
373- backoff . backoffAndRun (
410+ firestoreExecutor . schedule (
374411 new Runnable () {
375412 @ Override
376413 public void run () {
@@ -379,10 +416,15 @@ public void run() {
379416 }
380417
381418 synchronized (Watch .this ) {
419+ if (!isActive .get ()) {
420+ return ;
421+ }
422+
382423 Preconditions .checkState (stream == null );
383424
384425 current = false ;
385426 hasPushed = false ;
427+ nextAttempt = backoff .createNextAttempt (nextAttempt );
386428
387429 stream = firestore .streamRequest (Watch .this , firestore .getClient ().listenCallable ());
388430
@@ -396,7 +438,9 @@ public void run() {
396438 stream .onNext (request .build ());
397439 }
398440 }
399- });
441+ },
442+ nextAttempt .getRandomizedRetryDelay ().toMillis (),
443+ TimeUnit .MILLISECONDS );
400444 }
401445
402446 /**
0 commit comments