Skip to content

Commit 125fc4e

Browse files
authored
feat: Instrument Reader and Writer methods (#2829)
* feat: Instrument Reader and Writer methods
1 parent 6a2b3ca commit 125fc4e

File tree

6 files changed

+341
-70
lines changed

6 files changed

+341
-70
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -801,17 +801,26 @@ public GrpcBlobReadChannel reader(String bucket, String blob, BlobSourceOption..
801801

802802
@Override
803803
public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
804-
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
805-
ReadObjectRequest request = getReadObjectRequest(blob, opts);
806-
GrpcCallContext grpcCallContext = Retrying.newCallContext();
807-
808-
return new GrpcBlobReadChannel(
809-
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
810-
getOptions(),
811-
retryAlgorithmManager.getFor(request),
812-
responseContentLifecycleManager,
813-
request,
814-
!opts.autoGzipDecompression());
804+
Span otelSpan = openTelemetryTraceUtil.startSpan("reader", this.getClass().getName());
805+
try (Scope unused = otelSpan.makeCurrent()) {
806+
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob).prepend(defaultOpts);
807+
ReadObjectRequest request = getReadObjectRequest(blob, opts);
808+
GrpcCallContext grpcCallContext = Retrying.newCallContext();
809+
810+
return new GrpcBlobReadChannel(
811+
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
812+
getOptions(),
813+
retryAlgorithmManager.getFor(request),
814+
responseContentLifecycleManager,
815+
request,
816+
!opts.autoGzipDecompression());
817+
} catch (Exception e) {
818+
otelSpan.recordException(e);
819+
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
820+
throw StorageException.coalesce(e);
821+
} finally {
822+
otelSpan.end();
823+
}
815824
}
816825

817826
@Override
@@ -854,25 +863,35 @@ public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption.
854863

855864
@Override
856865
public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
857-
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
858-
GrpcCallContext grpcCallContext =
859-
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
860-
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
861-
Hasher hasher = Hasher.noop();
862-
// in JSON, the starting of the resumable session happens before the invocation of write can
863-
// happen. Emulate the same thing here.
864-
// 1. create the future
865-
ApiFuture<ResumableWrite> startResumableWrite = startResumableWrite(grpcCallContext, req, opts);
866-
// 2. await the result of the future
867-
ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite);
868-
// 3. wrap the result in another future container before constructing the BlobWriteChannel
869-
ApiFuture<ResumableWrite> wrapped = ApiFutures.immediateFuture(resumableWrite);
870-
return new GrpcBlobWriteChannel(
871-
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext),
872-
getOptions(),
873-
retryAlgorithmManager.idempotent(),
874-
() -> wrapped,
875-
hasher);
866+
Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName());
867+
try (Scope unused = otelSpan.makeCurrent()) {
868+
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo).prepend(defaultOpts);
869+
GrpcCallContext grpcCallContext =
870+
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
871+
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
872+
Hasher hasher = Hasher.noop();
873+
// in JSON, the starting of the resumable session happens before the invocation of write can
874+
// happen. Emulate the same thing here.
875+
// 1. create the future
876+
ApiFuture<ResumableWrite> startResumableWrite =
877+
startResumableWrite(grpcCallContext, req, opts);
878+
// 2. await the result of the future
879+
ResumableWrite resumableWrite = ApiFutureUtils.await(startResumableWrite);
880+
// 3. wrap the result in another future container before constructing the BlobWriteChannel
881+
ApiFuture<ResumableWrite> wrapped = ApiFutures.immediateFuture(resumableWrite);
882+
return new GrpcBlobWriteChannel(
883+
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext),
884+
getOptions(),
885+
retryAlgorithmManager.idempotent(),
886+
() -> wrapped,
887+
hasher);
888+
} catch (Exception e) {
889+
otelSpan.recordException(e);
890+
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
891+
throw StorageException.coalesce(e);
892+
} finally {
893+
otelSpan.end();
894+
}
876895
}
877896

878897
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java

Lines changed: 62 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
5353
import com.google.cloud.storage.UnifiedOpts.Opts;
5454
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil;
55+
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Scope;
56+
import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Span;
5557
import com.google.cloud.storage.spi.v1.StorageRpc;
5658
import com.google.cloud.storage.spi.v1.StorageRpc.RewriteRequest;
5759
import com.google.common.base.CharMatcher;
@@ -744,10 +746,19 @@ public StorageReadChannel reader(String bucket, String blob, BlobSourceOption...
744746

745747
@Override
746748
public StorageReadChannel reader(BlobId blob, BlobSourceOption... options) {
747-
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob);
748-
StorageObject storageObject = Conversions.json().blobId().encode(blob);
749-
ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
750-
return new BlobReadChannelV2(storageObject, optionsMap, BlobReadChannelContext.from(this));
749+
Span otelSpan = openTelemetryTraceUtil.startSpan("reader", this.getClass().getName());
750+
try (Scope unused = otelSpan.makeCurrent()) {
751+
Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob);
752+
StorageObject storageObject = Conversions.json().blobId().encode(blob);
753+
ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
754+
return new BlobReadChannelV2(storageObject, optionsMap, BlobReadChannelContext.from(this));
755+
} catch (Exception e) {
756+
otelSpan.recordException(e);
757+
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
758+
throw e;
759+
} finally {
760+
otelSpan.end();
761+
}
751762
}
752763

753764
@Override
@@ -778,40 +789,58 @@ public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption.
778789

779790
@Override
780791
public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
781-
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo);
782-
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
783-
BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null);
784-
BlobInfo updated = opts.blobInfoMapper().apply(builder).build();
792+
Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName());
793+
try (Scope unused = otelSpan.makeCurrent()) {
794+
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo);
795+
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
796+
BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null);
797+
BlobInfo updated = opts.blobInfoMapper().apply(builder).build();
785798

786-
StorageObject encode = codecs.blobInfo().encode(updated);
787-
// open the resumable session outside the write channel
788-
// the exception behavior of open is different from #write(ByteBuffer)
789-
Supplier<String> uploadIdSupplier =
790-
ResumableMedia.startUploadForBlobInfo(
791-
getOptions(),
792-
updated,
793-
optionsMap,
794-
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
795-
JsonResumableWrite jsonResumableWrite =
796-
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);
797-
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
799+
StorageObject encode = codecs.blobInfo().encode(updated);
800+
// open the resumable session outside the write channel
801+
// the exception behavior of open is different from #write(ByteBuffer)
802+
Supplier<String> uploadIdSupplier =
803+
ResumableMedia.startUploadForBlobInfo(
804+
getOptions(),
805+
updated,
806+
optionsMap,
807+
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
808+
JsonResumableWrite jsonResumableWrite =
809+
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);
810+
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
811+
} catch (Exception e) {
812+
otelSpan.recordException(e);
813+
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
814+
throw e;
815+
} finally {
816+
otelSpan.end();
817+
}
798818
}
799819

800820
@Override
801821
public StorageWriteChannel writer(URL signedURL) {
802-
// TODO: is it possible to know if a signed url is configured to have a constraint which makes
803-
// it idempotent?
804-
ResultRetryAlgorithm<?> forResumableUploadSessionCreate =
805-
retryAlgorithmManager.getForResumableUploadSessionCreate(Collections.emptyMap());
806-
// open the resumable session outside the write channel
807-
// the exception behavior of open is different from #write(ByteBuffer)
808-
String signedUrlString = signedURL.toString();
809-
Supplier<String> uploadIdSupplier =
810-
ResumableMedia.startUploadForSignedUrl(
811-
getOptions(), signedURL, forResumableUploadSessionCreate);
812-
JsonResumableWrite jsonResumableWrite =
813-
JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0);
814-
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
822+
Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName());
823+
try (Scope unused = otelSpan.makeCurrent()) {
824+
// TODO: is it possible to know if a signed url is configured to have a constraint which makes
825+
// it idempotent?
826+
ResultRetryAlgorithm<?> forResumableUploadSessionCreate =
827+
retryAlgorithmManager.getForResumableUploadSessionCreate(Collections.emptyMap());
828+
// open the resumable session outside the write channel
829+
// the exception behavior of open is different from #write(ByteBuffer)
830+
String signedUrlString = signedURL.toString();
831+
Supplier<String> uploadIdSupplier =
832+
ResumableMedia.startUploadForSignedUrl(
833+
getOptions(), signedURL, forResumableUploadSessionCreate);
834+
JsonResumableWrite jsonResumableWrite =
835+
JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0);
836+
return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite);
837+
} catch (Exception e) {
838+
otelSpan.recordException(e);
839+
otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName());
840+
throw e;
841+
} finally {
842+
otelSpan.end();
843+
}
815844
}
816845

817846
@Override

google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcOpenTelemetryTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static java.nio.charset.StandardCharsets.UTF_8;
2020

2121
import com.google.cloud.NoCredentials;
22+
import com.google.cloud.ReadChannel;
23+
import com.google.cloud.WriteChannel;
2224
import com.google.cloud.storage.Storage.BlobSourceOption;
2325
import com.google.cloud.storage.Storage.BlobTargetOption;
2426
import com.google.cloud.storage.Storage.CopyRequest;
@@ -170,12 +172,37 @@ public void runCopy() {
170172
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("copy")));
171173
}
172174

175+
@Test
176+
public void runWriter() throws IOException {
177+
BlobInfo info = BlobInfo.newBuilder(testBucket, generator.randomObjectName()).build();
178+
try (WriteChannel writer = storage.writer(info)) {
179+
// Do nothing
180+
}
181+
TestExporter testExported = (TestExporter) exporter;
182+
List<SpanData> spanData = testExported.getExportedSpans();
183+
checkCommonAttributes(spanData);
184+
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("writer")));
185+
}
186+
187+
@Test
188+
public void runReader() throws IOException {
189+
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
190+
storage.create(blobInfo, helloWorldTextBytes);
191+
try (ReadChannel reader = storage.reader(blobId)) {
192+
// Do nothing
193+
}
194+
TestExporter testExported = (TestExporter) exporter;
195+
List<SpanData> spanData = testExported.getExportedSpans();
196+
checkCommonAttributes(spanData);
197+
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("reader")));
198+
}
199+
173200
private void checkCommonAttributes(List<SpanData> spanData) {
174201
for (SpanData span : spanData) {
175202
Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service"));
176203
Assert.assertEquals("googleapis/java-storage", getAttributeValue(span, "gcp.client.repo"));
177204
Assert.assertEquals(
178-
"com.google.cloud.google-cloud-storage", getAttributeValue(span, "gcp.client.artifact"));
205+
"com.google.cloud:google-cloud-storage", getAttributeValue(span, "gcp.client.artifact"));
179206
Assert.assertEquals("grpc", getAttributeValue(span, "rpc.system"));
180207
}
181208
}

google-cloud-storage/src/test/java/com/google/cloud/storage/ITHttpOpenTelemetryTest.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static java.nio.charset.StandardCharsets.UTF_8;
2020

2121
import com.google.cloud.NoCredentials;
22+
import com.google.cloud.ReadChannel;
23+
import com.google.cloud.WriteChannel;
2224
import com.google.cloud.storage.Storage.BlobSourceOption;
2325
import com.google.cloud.storage.Storage.BlobTargetOption;
2426
import com.google.cloud.storage.Storage.CopyRequest;
@@ -174,12 +176,37 @@ public void runCreateFromInputStream() throws IOException {
174176
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("createFrom")));
175177
}
176178

179+
@Test
180+
public void runWriter() throws IOException {
181+
BlobInfo info = BlobInfo.newBuilder(testBucket, generator.randomObjectName()).build();
182+
try (WriteChannel writer = storage.writer(info)) {
183+
// Do nothing
184+
}
185+
TestExporter testExported = (TestExporter) exporter;
186+
List<SpanData> spanData = testExported.getExportedSpans();
187+
checkCommonAttributes(spanData);
188+
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("writer")));
189+
}
190+
191+
@Test
192+
public void runReader() throws IOException {
193+
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
194+
storage.create(blobInfo, helloWorldTextBytes);
195+
try (ReadChannel reader = storage.reader(blobId)) {
196+
// Do nothing
197+
}
198+
TestExporter testExported = (TestExporter) exporter;
199+
List<SpanData> spanData = testExported.getExportedSpans();
200+
checkCommonAttributes(spanData);
201+
Assert.assertTrue(spanData.stream().anyMatch(x -> x.getName().contains("reader")));
202+
}
203+
177204
private void checkCommonAttributes(List<SpanData> spanData) {
178205
for (SpanData span : spanData) {
179206
Assert.assertEquals("Storage", getAttributeValue(span, "gcp.client.service"));
180207
Assert.assertEquals("googleapis/java-storage", getAttributeValue(span, "gcp.client.repo"));
181208
Assert.assertEquals(
182-
"com.google.cloud.google-cloud-storage", getAttributeValue(span, "gcp.client.artifact"));
209+
"com.google.cloud:google-cloud-storage", getAttributeValue(span, "gcp.client.artifact"));
183210
Assert.assertEquals("http", getAttributeValue(span, "rpc.system"));
184211
}
185212
}

google-cloud-storage/src/test/java/com/google/cloud/storage/ITOpenTelemetryTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void checkInstrumentation() {
5555
Assert.assertEquals("Storage", getAttributeValue(spanData, "gcp.client.service"));
5656
Assert.assertEquals("googleapis/java-storage", getAttributeValue(spanData, "gcp.client.repo"));
5757
Assert.assertEquals(
58-
"com.google.cloud.google-cloud-storage",
58+
"com.google.cloud:google-cloud-storage",
5959
getAttributeValue(spanData, "gcp.client.artifact"));
6060
Assert.assertEquals("http", getAttributeValue(spanData, "rpc.system"));
6161

@@ -84,7 +84,7 @@ public void checkInstrumentationGrpc() {
8484
Assert.assertEquals("Storage", getAttributeValue(spanData, "gcp.client.service"));
8585
Assert.assertEquals("googleapis/java-storage", getAttributeValue(spanData, "gcp.client.repo"));
8686
Assert.assertEquals(
87-
"com.google.cloud.google-cloud-storage",
87+
"com.google.cloud:google-cloud-storage",
8888
getAttributeValue(spanData, "gcp.client.artifact"));
8989
Assert.assertEquals("grpc", getAttributeValue(spanData, "rpc.system"));
9090

0 commit comments

Comments
 (0)