File tree Expand file tree Collapse file tree
main/java/io/weaviate/client6/v1/api/collections/batch
test/java/io/weaviate/client6/v1/api/collections/batch Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -136,6 +136,13 @@ public final class BatchContext<PropertiesT> implements Closeable {
136136 private final CollectionDescriptor <PropertiesT > collectionDescriptor ;
137137 private final CollectionHandleDefaults collectionHandleDefaults ;
138138
139+ /**
140+ * Tally of the failed items. This value is only written to from
141+ * {@link #retryService} thread, which processes the incoming
142+ * {@link Event.Results}; making it {@code volatile} is sufficient.
143+ */
144+ private volatile int numberOfErrors ;
145+
139146 /**
140147 * Internal execution service. Its lifecycle is bound to that of the
141148 * BatchContext: it's started when the context is initialized
@@ -314,12 +321,25 @@ private TaskHandle add(final TaskHandle taskHandle) throws InterruptedException
314321 // Remove the task from the WIP list as soon as it completes,
315322 // successfully or otherwise. Note, that TaskHandle::done future
316323 // only completes exceptionally after all retries have been exhausted.
317- taskHandle .done ().whenComplete ((__ , t ) -> wip .remove (taskHandle .id ()));
324+ taskHandle .done ().whenComplete ((__ , t ) -> {
325+ if (t != null ) {
326+ numberOfErrors ++;
327+ }
328+ wip .remove (taskHandle .id ());
329+ });
318330
319331 queue .put (taskHandle );
320332 return taskHandle ;
321333 }
322334
335+ /**
336+ * Get the current tally of failed tasks.
337+ * An object is only considered failed if it can no longer be retried.
338+ */
339+ public int numberOfErrors () {
340+ return numberOfErrors ;
341+ }
342+
323343 void start () {
324344 if (closed ) {
325345 throw new IllegalStateException ("context is closed" );
Original file line number Diff line number Diff line change @@ -503,9 +503,13 @@ public void test_retryPolicy() throws Exception {
503503 .noneMatch (CompletableFuture ::isCompletedExceptionally );
504504
505505 Assertions .assertThat (tasks .subList (BATCH_SIZE - 1 , BATCH_SIZE ))
506- .as ("last %d tasks succeeded " , BATCH_SIZE )
506+ .as ("last %d tasks failed " , BATCH_SIZE )
507507 .extracting (TaskHandle ::done )
508508 .allMatch (CompletableFuture ::isCompletedExceptionally );
509+
510+ Assertions .assertThat (context .numberOfErrors ())
511+ .as ("number of errors" )
512+ .isEqualTo (BATCH_SIZE );
509513 }
510514
511515 @ Test
@@ -577,7 +581,6 @@ static String getBeacon(WeaviateProtoBatch.BatchReference reference) {
577581 private static final class OutboundStream {
578582 private final StreamObserver <Event > stream ;
579583 private final Executor eventThread ;
580- private final List <Event > pendingEvents = new ArrayList <>();
581584
582585 OutboundStream (StreamObserver <Event > stream , Executor eventThread ) {
583586 this .stream = stream ;
You can’t perform that action at this time.
0 commit comments