Skip to content

Commit a6790cb

Browse files
authored
chore: attempt to drain gRPC ReadObject stream iterator (#2700)
Related #2696
1 parent 4e84c8e commit a6790cb

2 files changed

Lines changed: 25 additions & 2 deletions

File tree

java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ public boolean hasNext() {
182182
if (!result.isDone()) {
183183
result.setException(StorageException.coalesce(e));
184184
}
185+
reset();
185186
throw e;
186187
}
187188
}
@@ -194,15 +195,31 @@ public ReadObjectResponse next() {
194195
if (!result.isDone()) {
195196
result.setException(StorageException.coalesce(e));
196197
}
198+
reset();
197199
throw e;
198200
}
199201
}
200202

201203
@Override
202204
public void close() {
203205
if (serverStream != null) {
204-
// todo: do we need to "drain" anything?
205206
serverStream.cancel();
207+
if (responseIterator != null) {
208+
IOException ioException = null;
209+
while (responseIterator.hasNext()) {
210+
try {
211+
ReadObjectResponse next = responseIterator.next();
212+
ResponseContentLifecycleHandle handle = rclm.get(next);
213+
handle.close();
214+
} catch (IOException e) {
215+
if (ioException == null) {
216+
ioException = e;
217+
} else if (ioException != e) {
218+
ioException.addSuppressed(e);
219+
}
220+
}
221+
}
222+
}
206223
}
207224
}
208225

@@ -223,5 +240,11 @@ private Iterator<ReadObjectResponse> ensureResponseIteratorOpen() {
223240
}
224241
}
225242
}
243+
244+
private void reset() {
245+
serverStream = null;
246+
responseIterator = null;
247+
streamInitialized = false;
248+
}
226249
}
227250
}

java-storage/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1098,7 +1098,7 @@ static void closeAllStreams(Iterable<InputStream> inputStreams) throws IOExcepti
10981098
} catch (IOException e) {
10991099
if (ioException == null) {
11001100
ioException = e;
1101-
} else {
1101+
} else if (ioException != e) {
11021102
ioException.addSuppressed(e);
11031103
}
11041104
}

0 commit comments

Comments
 (0)