Skip to content

Commit 3e41d6f

Browse files
committed
Instrument Reader and Writer methods
1 parent bd2fd5e commit 3e41d6f

5 files changed

Lines changed: 336 additions & 65 deletions

File tree

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -800,17 +800,26 @@ public GrpcBlobReadChannel reader(String bucket, String blob, BlobSourceOption..
800800

801801
@Override
802802
public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
803-
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
804-
ReadObjectRequest request = getReadObjectRequest(blob, opts);
805-
GrpcCallContext grpcCallContext = Retrying.newCallContext();
803+
Span otelSpan = openTelemetryTraceUtil.startSpan("reader", this.getClass().getName());
804+
try (Scope unused = otelSpan.makeCurrent()) {
805+
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
806+
ReadObjectRequest request = getReadObjectRequest(blob, opts);
807+
GrpcCallContext grpcCallContext = Retrying.newCallContext();
806808

807-
return new GrpcBlobReadChannel(
808-
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
809-
getOptions(),
810-
retryAlgorithmManager.getFor(request),
811-
responseContentLifecycleManager,
812-
request,
813-
!opts.autoGzipDecompression());
809+
return new GrpcBlobReadChannel(
810+
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
811+
getOptions(),
812+
retryAlgorithmManager.getFor(request),
813+
responseContentLifecycleManager,
814+
request,
815+
!opts.autoGzipDecompression());
816+
} catch (Exception e) {
817+
otelSpan.recordException(e);
818+
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
819+
throw StorageException.coalesce(e);
820+
} finally {
821+
otelSpan.end();
822+
}
814823
}
815824

816825
@Override
@@ -853,25 +862,35 @@ public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption.
853862

854863
@Override
855864
public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
856-
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
857-
GrpcCallContext grpcCallContext =
858-
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
859-
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
860-
Hasher hasher = Hasher.noop();
861-
// in JSON, the starting of the resumable session happens before the invocation of write can
862-
// happen. Emulate the same thing here.
863-
// 1. create the future
864-
ApiFuture<ResumableWrite> startResumableWrite = startResumableWrite(grpcCallContext, req, opts);
865-
// 2. await the result of the future
866-
ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite);
867-
// 3. wrap the result in another future container before constructing the BlobWriteChannel
868-
ApiFuture<ResumableWrite> wrapped = ApiFutures.immediateFuture(resumableWrite);
869-
return new GrpcBlobWriteChannel(
870-
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext),
871-
getOptions(),
872-
retryAlgorithmManager.idempotent(),
873-
() -> wrapped,
874-
hasher);
865+
Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName());
866+
try (Scope unused = otelSpan.makeCurrent()) {
867+
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
868+
GrpcCallContext grpcCallContext =
869+
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
870+
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
871+
Hasher hasher = Hasher.noop();
872+
// in JSON, the starting of the resumable session happens before the invocation of write can
873+
// happen. Emulate the same thing here.
874+
// 1. create the future
875+
ApiFuture<ResumableWrite> startResumableWrite =
876+
startResumableWrite(grpcCallContext, req, opts);
877+
// 2. await the result of the future
878+
ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite);
879+
// 3. wrap the result in another future container before constructing the BlobWriteChannel
880+
ApiFuture<ResumableWrite> wrapped = ApiFutures.immediateFuture(resumableWrite);
881+
return new GrpcBlobWriteChannel(
882+
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext),
883+
getOptions(),
884+
retryAlgorithmManager.idempotent(),
885+
() -> wrapped,
886+
hasher);
887+
} catch (Exception e) {
888+
otelSpan.recordException(e);
889+
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
890+
throw StorageException.coalesce(e);
891+
} finally {
892+
otelSpan.end();
893+
}
875894
}
876895

877896
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java

Lines changed: 62 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
5252
import com.google.cloud.storage.UnifiedOpts.Opts;
5353
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil;
54+
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Scope;
55+
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Span;
5456
import com.google.cloud.storage.spi.v1.StorageRpc;
5557
import com.google.cloud.storage.spi.v1.StorageRpc.RewriteRequest;
5658
import com.google.common.base.CharMatcher;
@@ -743,10 +745,19 @@ public StorageReadChannel reader(String bucket, String blob, BlobSourceOption...
743745

744746
@Override
745747
public StorageReadChannel reader(BlobId blob, BlobSourceOption... options) {
746-
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob);
747-
StorageObject storageObject = Conversions.json().blobId().encode(blob);
748-
ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
749-
return new BlobReadChannelV2(storageObject, optionsMap, BlobReadChannelContext.from(this));
748+
Span otelSpan = openTelemetryTraceUtil.startSpan("reader", this.getClass().getName());
749+
try (Scope unused = otelSpan.makeCurrent()) {
750+
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob);
751+
StorageObject storageObject = Conversions.json().blobId().encode(blob);
752+
ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
753+
return new BlobReadChannelV2(storageObject, optionsMap, BlobReadChannelContext.from(this));
754+
} catch (Exception e) {
755+
otelSpan.recordException(e);
756+
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
757+
throw e;
758+
} finally {
759+
otelSpan.end();
760+
}
750761
}
751762

752763
@Override
@@ -777,40 +788,58 @@ public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption.
777788

778789
@Override
779790
public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
780-
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo);
781-
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
782-
BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null);
783-
BlobInfo updated = opts.blobInfoMapper().apply(builder).build();
791+
Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName());
792+
try (Scope unused = otelSpan.makeCurrent()) {
793+
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo);
794+
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
795+
BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null);
796+
BlobInfo updated = opts.blobInfoMapper().apply(builder).build();
784797

785-
StorageObject encode = codecs.blobInfo().encode(updated);
786-
// open the resumable session outside the write channel
787-
// the exception behavior of open is different from #write(ByteBuffer)
788-
Supplier<String> uploadIdSupplier =
789-
ResumableMedia.startUploadForBlobInfo(
790-
getOptions(),
791-
updated,
792-
optionsMap,
793-
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
794-
JsonResumableWrite jsonResumableWrite =
795-
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);
796-
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
798+
StorageObject encode = codecs.blobInfo().encode(updated);
799+
// open the resumable session outside the write channel
800+
// the exception behavior of open is different from #write(ByteBuffer)
801+
Supplier<String> uploadIdSupplier =
802+
ResumableMedia.startUploadForBlobInfo(
803+
getOptions(),
804+
updated,
805+
optionsMap,
806+
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
807+
JsonResumableWrite jsonResumableWrite =
808+
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);
809+
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
810+
} catch (Exception e) {
811+
otelSpan.recordException(e);
812+
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
813+
throw e;
814+
} finally {
815+
otelSpan.end();
816+
}
797817
}
798818

799819
@Override
800820
public StorageWriteChannel writer(URL signedURL) {
801-
// TODO: is it possible to know if a signed url is configured to have a constraint which makes
802-
// it idempotent?
803-
ResultRetryAlgorithm<?> forResumableUploadSessionCreate =
804-
retryAlgorithmManager.getForResumableUploadSessionCreate(Collections.emptyMap());
805-
// open the resumable session outside the write channel
806-
// the exception behavior of open is different from #write(ByteBuffer)
807-
String signedUrlString = signedURL.toString();
808-
Supplier<String> uploadIdSupplier =
809-
ResumableMedia.startUploadForSignedUrl(
810-
getOptions(), signedURL, forResumableUploadSessionCreate);
811-
JsonResumableWrite jsonResumableWrite =
812-
JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0);
813-
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
821+
Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName());
822+
try (Scope unused = otelSpan.makeCurrent()) {
823+
// TODO: is it possible to know if a signed url is configured to have a constraint which makes
824+
// it idempotent?
825+
ResultRetryAlgorithm<?> forResumableUploadSessionCreate =
826+
retryAlgorithmManager.getForResumableUploadSessionCreate(Collections.emptyMap());
827+
// open the resumable session outside the write channel
828+
// the exception behavior of open is different from #write(ByteBuffer)
829+
String signedUrlString = signedURL.toString();
830+
Supplier<String> uploadIdSupplier =
831+
ResumableMedia.startUploadForSignedUrl(
832+
getOptions(), signedURL, forResumableUploadSessionCreate);
833+
JsonResumableWrite jsonResumableWrite =
834+
JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0);
835+
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
836+
} catch (Exception e) {
837+
otelSpan.recordException(e);
838+
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
839+
throw e;
840+
} finally {
841+
otelSpan.end();
842+
}
814843
}
815844

816845
@Override

google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static java.nio.charset.StandardCharsets.UTF_8;
2020

2121
import com.google.cloud.NoCredentials;
22+
import com.google.cloud.ReadChannel;
23+
import com.google.cloud.WriteChannel;
2224
import com.google.cloud.storage.Storage.BlobSourceOption;
2325
import com.google.cloud.storage.Storage.BlobTargetOption;
2426
import com.google.cloud.storage.Storage.CopyRequest;
@@ -170,6 +172,31 @@ public void runCopy() {
170172
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("copy")));
171173
}
172174

175+
@Test
176+
public void runWriter() throws IOException {
177+
BlobInfo info = BlobInfo.newBuilder(testBucket, generator.randomObjectName()).build();
178+
try (WriteChannel writer = storage.writer(info)) {
179+
// Do nothing
180+
}
181+
TestExporter testExported = (TestExporter) exporter;
182+
List<SpanData> spanData = testExported.getExportedSpans();
183+
checkCommonAttributes(spanData);
184+
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("writer")));
185+
}
186+
187+
@Test
188+
public void runReader() throws IOException {
189+
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
190+
storage.create(blobInfo, helloWorldTextBytes);
191+
try (ReadChannel reader = storage.reader(blobId)) {
192+
// Do nothing
193+
}
194+
TestExporter testExported = (TestExporter) exporter;
195+
List<SpanData> spanData = testExported.getExportedSpans();
196+
checkCommonAttributes(spanData);
197+
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("reader")));
198+
}
199+
173200
private void checkCommonAttributes(List<SpanData> spanData) {
174201
for (SpanData span : spanData) {
175202
Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service"));

google-cloud-storage/src/test/java/com/google/cloud/storage/ITHttpOpenTelemetryTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static java.nio.charset.StandardCharsets.UTF_8;
2020

2121
import com.google.cloud.NoCredentials;
22+
import com.google.cloud.ReadChannel;
23+
import com.google.cloud.WriteChannel;
2224
import com.google.cloud.storage.Storage.BlobSourceOption;
2325
import com.google.cloud.storage.Storage.BlobTargetOption;
2426
import com.google.cloud.storage.Storage.CopyRequest;
@@ -174,6 +176,31 @@ public void runCreateFromInputStream() throws IOException {
174176
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("createFrom")));
175177
}
176178

179+
@Test
180+
public void runWriter() throws IOException {
181+
BlobInfo info = BlobInfo.newBuilder(testBucket, generator.randomObjectName()).build();
182+
try (WriteChannel writer = storage.writer(info)) {
183+
// Do nothing
184+
}
185+
TestExporter testExported = (TestExporter) exporter;
186+
List<SpanData> spanData = testExported.getExportedSpans();
187+
checkCommonAttributes(spanData);
188+
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("writer")));
189+
}
190+
191+
@Test
192+
public void runReader() throws IOException {
193+
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
194+
storage.create(blobInfo, helloWorldTextBytes);
195+
try (ReadChannel reader = storage.reader(blobId)) {
196+
// Do nothing
197+
}
198+
TestExporter testExported = (TestExporter) exporter;
199+
List<SpanData> spanData = testExported.getExportedSpans();
200+
checkCommonAttributes(spanData);
201+
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("reader")));
202+
}
203+
177204
private void checkCommonAttributes(List<SpanData> spanData) {
178205
for (SpanData span : spanData) {
179206
Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service"));

0 commit comments

Comments
 (0)