|
51 | 51 | import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt; |
52 | 52 | import com.google.cloud.storage.UnifiedOpts.Opts; |
53 | 53 | import com.google.cloud.storage.otel.OpenTelemetryTraceUtil; |
| 54 | +import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Scope; |
| 55 | +import com.google.cloud.storage.otel.OpenTelemetryTraceUtil.Span; |
54 | 56 | import com.google.cloud.storage.spi.v1.StorageRpc; |
55 | 57 | import com.google.cloud.storage.spi.v1.StorageRpc.RewriteRequest; |
56 | 58 | import com.google.common.base.CharMatcher; |
@@ -743,10 +745,19 @@ public StorageReadChannel reader(String bucket, String blob, BlobSourceOption... |
743 | 745 |
|
744 | 746 | @Override |
745 | 747 | public StorageReadChannel reader(BlobId blob, BlobSourceOption... options) { |
746 | | - Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob); |
747 | | - StorageObject storageObject = Conversions.json().blobId().encode(blob); |
748 | | - ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions(); |
749 | | - return new BlobReadChannelV2(storageObject, optionsMap, BlobReadChannelContext.from(this)); |
| 748 | + Span otelSpan = openTelemetryTraceUtil.startSpan("reader", this.getClass().getName()); |
| 749 | + try (Scope unused = otelSpan.makeCurrent()) { |
| 750 | + Opts<ObjectSourceOpt> opts = Opts.unwrap(options).resolveFrom(blob); |
| 751 | + StorageObject storageObject = Conversions.json().blobId().encode(blob); |
| 752 | + ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions(); |
| 753 | + return new BlobReadChannelV2(storageObject, optionsMap, BlobReadChannelContext.from(this)); |
| 754 | + } catch (Exception e) { |
| 755 | + otelSpan.recordException(e); |
| 756 | + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); |
| 757 | + throw e; |
| 758 | + } finally { |
| 759 | + otelSpan.end(); |
| 760 | + } |
750 | 761 | } |
751 | 762 |
|
752 | 763 | @Override |
@@ -777,40 +788,58 @@ public void downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption. |
777 | 788 |
|
778 | 789 | @Override |
779 | 790 | public StorageWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) { |
780 | | - Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo); |
781 | | - final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions(); |
782 | | - BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null); |
783 | | - BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); |
| 791 | + Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName()); |
| 792 | + try (Scope unused = otelSpan.makeCurrent()) { |
| 793 | + Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo); |
| 794 | + final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions(); |
| 795 | + BlobInfo.Builder builder = blobInfo.toBuilder().setMd5(null).setCrc32c(null); |
| 796 | + BlobInfo updated = opts.blobInfoMapper().apply(builder).build(); |
784 | 797 |
|
785 | | - StorageObject encode = codecs.blobInfo().encode(updated); |
786 | | - // open the resumable session outside the write channel |
787 | | - // the exception behavior of open is different from #write(ByteBuffer) |
788 | | - Supplier<String> uploadIdSupplier = |
789 | | - ResumableMedia.startUploadForBlobInfo( |
790 | | - getOptions(), |
791 | | - updated, |
792 | | - optionsMap, |
793 | | - retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); |
794 | | - JsonResumableWrite jsonResumableWrite = |
795 | | - JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); |
796 | | - return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); |
| 798 | + StorageObject encode = codecs.blobInfo().encode(updated); |
| 799 | + // open the resumable session outside the write channel |
| 800 | + // the exception behavior of open is different from #write(ByteBuffer) |
| 801 | + Supplier<String> uploadIdSupplier = |
| 802 | + ResumableMedia.startUploadForBlobInfo( |
| 803 | + getOptions(), |
| 804 | + updated, |
| 805 | + optionsMap, |
| 806 | + retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap)); |
| 807 | + JsonResumableWrite jsonResumableWrite = |
| 808 | + JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0); |
| 809 | + return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); |
| 810 | + } catch (Exception e) { |
| 811 | + otelSpan.recordException(e); |
| 812 | + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); |
| 813 | + throw e; |
| 814 | + } finally { |
| 815 | + otelSpan.end(); |
| 816 | + } |
797 | 817 | } |
798 | 818 |
|
799 | 819 | @Override |
800 | 820 | public StorageWriteChannel writer(URL signedURL) { |
801 | | - // TODO: is it possible to know if a signed url is configured to have a constraint which makes |
802 | | - // it idempotent? |
803 | | - ResultRetryAlgorithm<?> forResumableUploadSessionCreate = |
804 | | - retryAlgorithmManager.getForResumableUploadSessionCreate(Collections.emptyMap()); |
805 | | - // open the resumable session outside the write channel |
806 | | - // the exception behavior of open is different from #write(ByteBuffer) |
807 | | - String signedUrlString = signedURL.toString(); |
808 | | - Supplier<String> uploadIdSupplier = |
809 | | - ResumableMedia.startUploadForSignedUrl( |
810 | | - getOptions(), signedURL, forResumableUploadSessionCreate); |
811 | | - JsonResumableWrite jsonResumableWrite = |
812 | | - JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0); |
813 | | - return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); |
| 821 | + Span otelSpan = openTelemetryTraceUtil.startSpan("writer", this.getClass().getName()); |
| 822 | + try (Scope unused = otelSpan.makeCurrent()) { |
| 823 | + // TODO: is it possible to know if a signed url is configured to have a constraint which makes |
| 824 | + // it idempotent? |
| 825 | + ResultRetryAlgorithm<?> forResumableUploadSessionCreate = |
| 826 | + retryAlgorithmManager.getForResumableUploadSessionCreate(Collections.emptyMap()); |
| 827 | + // open the resumable session outside the write channel |
| 828 | + // the exception behavior of open is different from #write(ByteBuffer) |
| 829 | + String signedUrlString = signedURL.toString(); |
| 830 | + Supplier<String> uploadIdSupplier = |
| 831 | + ResumableMedia.startUploadForSignedUrl( |
| 832 | + getOptions(), signedURL, forResumableUploadSessionCreate); |
| 833 | + JsonResumableWrite jsonResumableWrite = |
| 834 | + JsonResumableWrite.of(signedUrlString, uploadIdSupplier.get(), 0); |
| 835 | + return new BlobWriteChannelV2(BlobReadChannelContext.from(getOptions()), jsonResumableWrite); |
| 836 | + } catch (Exception e) { |
| 837 | + otelSpan.recordException(e); |
| 838 | + otelSpan.setStatus(io.opentelemetry.api.trace.StatusCode.ERROR, e.getClass().getSimpleName()); |
| 839 | + throw e; |
| 840 | + } finally { |
| 841 | + otelSpan.end(); |
| 842 | + } |
814 | 843 | } |
815 | 844 |
|
816 | 845 | @Override |
|
0 commit comments