|
52 | 52 | import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; |
53 | 53 | import com.google.cloud.storage.UnifiedOpts.Opts; |
54 | 54 | import com.google.cloud.storage.otel.OpenTelemetryTraceUtil; |
| 55 | +import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Scope; |
| 56 | +import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Span; |
55 | 57 | import com.google.cloud.storage.spi.v1.StorageRpc; |
56 | 58 | import com.google.cloud.storage.spi.v1.StorageRpc.RewriteRequest; |
57 | 59 | import com.google.common.base.CharMatcher; |
@@ -744,10 +746,19 @@ public StorageReadChannel reader(String bucket, String blob, BlobSourceOption... |
744 | 746 |
|
745 | 747 | @Override |
746 | 748 | public StorageReadChannel reader(BlobId blob, BlobSourceOption... options) { |
747 | | - Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob); |
748 | | - StorageObject storageObject = Conversions.json().blobId().encode(blob); |
749 | | - ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions(); |
750 | | - return new BlobReadChannelV2(storageObject, optionsMap, BlobReadChannelContext.from(this)); |
| 749 | + Span otelSpan = openTelemetryTraceUtil.startSpan("reader", this.getClass().getName()); |
| 750 | + try (Scope unused = otelSpan.makeCurrent()) { |
| 751 | + Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob); |
| 752 | + StorageObject storageObject = Conversions.json().blobId().encode(blob); |
| 753 | + ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions(); |
| 754 | + return new BlobReadChannelV2(storageObject, optionsMap, BlobReadChannelContext.from(this)); |
| 755 | + } catch (Exception e) { |
| 756 | + otelSpan.recordException(e); |
| 757 | + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); |
| 758 | + throw e; |
| 759 | + } finally { |
| 760 | + otelSpan.end(); |
| 761 | + } |
751 | 762 | } |
752 | 763 |
|
753 | 764 | @Override |
@@ -778,40 +789,58 @@ public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption. |
778 | 789 |
|
779 | 790 | @Override |
780 | 791 | public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) { |
781 | | - Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo); |
782 | | - final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions(); |
783 | | - BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null); |
784 | | - BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); |
| 792 | + Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName()); |
| 793 | + try (Scope unused = otelSpan.makeCurrent()) { |
| 794 | + Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo); |
| 795 | + final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions(); |
| 796 | + BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null); |
| 797 | + BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); |
785 | 798 |
|
786 | | - StorageObject encode = codecs.blobInfo().encode(updated); |
787 | | - // open the resumable session outside the write channel |
788 | | - // the exception behavior of open is different from #write(ByteBuffer) |
789 | | - Supplier<String> uploadIdSupplier = |
790 | | - ResumableMedia.startUploadForBlobInfo( |
791 | | - getOptions(), |
792 | | - updated, |
793 | | - optionsMap, |
794 | | - retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); |
795 | | - JsonResumableWrite jsonResumableWrite = |
796 | | - JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); |
797 | | - return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); |
| 799 | + StorageObject encode = codecs.blobInfo().encode(updated); |
| 800 | + // open the resumable session outside the write channel |
| 801 | + // the exception behavior of open is different from #write(ByteBuffer) |
| 802 | + Supplier<String> uploadIdSupplier = |
| 803 | + ResumableMedia.startUploadForBlobInfo( |
| 804 | + getOptions(), |
| 805 | + updated, |
| 806 | + optionsMap, |
| 807 | + retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); |
| 808 | + JsonResumableWrite jsonResumableWrite = |
| 809 | + JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); |
| 810 | + return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); |
| 811 | + } catch (Exception e) { |
| 812 | + otelSpan.recordException(e); |
| 813 | + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); |
| 814 | + throw e; |
| 815 | + } finally { |
| 816 | + otelSpan.end(); |
| 817 | + } |
798 | 818 | } |
799 | 819 |
|
800 | 820 | @Override |
801 | 821 | public StorageWriteChannel writer(URL signedURL) { |
802 | | - // TODO: is it possible to know if a signed url is configured to have a constraint which makes |
803 | | - // it idempotent? |
804 | | - ResultRetryAlgorithm<?> forResumableUploadSessionCreate = |
805 | | - retryAlgorithmManager.getForResumableUploadSessionCreate(Collections.emptyMap()); |
806 | | - // open the resumable session outside the write channel |
807 | | - // the exception behavior of open is different from #write(ByteBuffer) |
808 | | - String signedUrlString = signedURL.toString(); |
809 | | - Supplier<String> uploadIdSupplier = |
810 | | - ResumableMedia.startUploadForSignedUrl( |
811 | | - getOptions(), signedURL, forResumableUploadSessionCreate); |
812 | | - JsonResumableWrite jsonResumableWrite = |
813 | | - JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0); |
814 | | - return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); |
| 822 | + Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName()); |
| 823 | + try (Scope unused = otelSpan.makeCurrent()) { |
| 824 | + // TODO: is it possible to know if a signed url is configured to have a constraint which makes |
| 825 | + // it idempotent? |
| 826 | + ResultRetryAlgorithm<?> forResumableUploadSessionCreate = |
| 827 | + retryAlgorithmManager.getForResumableUploadSessionCreate(Collections.emptyMap()); |
| 828 | + // open the resumable session outside the write channel |
| 829 | + // the exception behavior of open is different from #write(ByteBuffer) |
| 830 | + String signedUrlString = signedURL.toString(); |
| 831 | + Supplier<String> uploadIdSupplier = |
| 832 | + ResumableMedia.startUploadForSignedUrl( |
| 833 | + getOptions(), signedURL, forResumableUploadSessionCreate); |
| 834 | + JsonResumableWrite jsonResumableWrite = |
| 835 | + JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0); |
| 836 | + return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); |
| 837 | + } catch (Exception e) { |
| 838 | + otelSpan.recordException(e); |
| 839 | + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); |
| 840 | + throw e; |
| 841 | + } finally { |
| 842 | + otelSpan.end(); |
| 843 | + } |
815 | 844 | } |
816 | 845 |
|
817 | 846 | @Override |
|
0 commit comments