Skip to content

Commit 08b7f61

Browse files
authored
Merge pull request #551 from weaviate/feat/count_errors
Batch: Publish the total number of failed tasks
2 parents 73637b6 + 6d3e4fe commit 08b7f61

2 files changed

Lines changed: 26 additions & 3 deletions

File tree

src/main/java/io/weaviate/client6/v1/api/collections/batch/BatchContext.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,13 @@ public final class BatchContext<PropertiesT> implements Closeable {
136136
private final CollectionDescriptor<PropertiesT> collectionDescriptor;
137137
private final CollectionHandleDefaults collectionHandleDefaults;
138138

139+
/**
140+
* Tally of the failed items. This value is only written to from
141+
* {@link #retryService} thread, which processes the incoming
142+
* {@link Event.Results}; making it {@code volatile} is sufficient.
143+
*/
144+
private volatile int numberOfErrors;
145+
139146
/**
140147
* Internal execution service. Its lifecycle is bound to that of the
141148
* BatchContext: it's started when the context is initialized
@@ -314,12 +321,25 @@ private TaskHandle add(final TaskHandle taskHandle) throws InterruptedException
314321
// Remove the task from the WIP list as soon as it completes,
315322
// successfully or otherwise. Note, that TaskHandle::done future
316323
// only completes exceptionally after all retries have been exhausted.
317-
taskHandle.done().whenComplete((__, t) -> wip.remove(taskHandle.id()));
324+
taskHandle.done().whenComplete((__, t) -> {
325+
if (t != null) {
326+
numberOfErrors++;
327+
}
328+
wip.remove(taskHandle.id());
329+
});
318330

319331
queue.put(taskHandle);
320332
return taskHandle;
321333
}
322334

335+
/**
336+
* Get the current tally of failed tasks.
337+
* An object is only considered failed if it can no longer be retried.
338+
*/
339+
public int numberOfErrors() {
340+
return numberOfErrors;
341+
}
342+
323343
void start() {
324344
if (closed) {
325345
throw new IllegalStateException("context is closed");

src/test/java/io/weaviate/client6/v1/api/collections/batch/BatchContextTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -503,9 +503,13 @@ public void test_retryPolicy() throws Exception {
503503
.noneMatch(CompletableFuture::isCompletedExceptionally);
504504

505505
Assertions.assertThat(tasks.subList(BATCH_SIZE - 1, BATCH_SIZE))
506-
.as("last %d tasks succeeded", BATCH_SIZE)
506+
.as("last %d tasks failed", BATCH_SIZE)
507507
.extracting(TaskHandle::done)
508508
.allMatch(CompletableFuture::isCompletedExceptionally);
509+
510+
Assertions.assertThat(context.numberOfErrors())
511+
.as("number of errors")
512+
.isEqualTo(BATCH_SIZE);
509513
}
510514

511515
@Test
@@ -577,7 +581,6 @@ static String getBeacon(WeaviateProtoBatch.BatchReference reference) {
577581
private static final class OutboundStream {
578582
private final StreamObserver<Event> stream;
579583
private final Executor eventThread;
580-
private final List<Event> pendingEvents = new ArrayList<>();
581584

582585
OutboundStream(StreamObserver<Event> stream, Executor eventThread) {
583586
this.stream = stream;

0 commit comments

Comments
 (0)