KAFKA-8964: Rename tag client-id for thread-level metrics and below#7429
Conversation
|
Call for review: @guozhangwang @vvcephei @bbejeck |
| public final Map<String, String> tagMap(final String... tags) { | ||
| final Map<String, String> tagMap = new LinkedHashMap<>(); | ||
| tagMap.put("client-id", threadName); | ||
| if (tags != null) { |
There was a problem hiding this comment.
Nice refactoring to put this code in a separate method!
| tagMap.get(builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? StreamsMetricsImpl.THREAD_ID_TAG | ||
| : StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_23), |
There was a problem hiding this comment.
Maybe get the value of this expression on the line above. IMHO it will make it easier to see what the assertThat is doing.
| tagMap.get(builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST) ? StreamsMetricsImpl.THREAD_ID_TAG | ||
| : StreamsMetricsImpl.THREAD_ID_TAG_0100_TO_23), |
|
Java 11/2.12, Java 11/2.13 and Java 8 all failed. Test results already cleaned up. retest this please |
|
ping either of @guozhangwang or @vvcephei for a second review |
guozhangwang
left a comment
There was a problem hiding this comment.
Minor comments, otherwise LGTM.
| final Map<String, String> tagMap = metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, processorNodeName); | ||
| final Map<String, String> allTagMap = metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, "all"); | ||
| final Map<String, String> tagMap = metrics.nodeLevelTagMap(context.taskId().toString(), processorNodeName); | ||
| final Map<String, String> allTagMap = |
There was a problem hiding this comment.
This is still a node-level metrics technically speaking, but just with node-tag pointing to all right? In that can should we just call nodeLevelTagMap(context.taskId().toString(), ROLLUP_VALUE)?
There was a problem hiding this comment.
Good catch! Of course! I even did as you said for the commitOverTask sensor. I totally overlooked this one.
| return tagMap; | ||
| } | ||
|
|
||
| public Map<String, String> taskLevelTagMap(final String taskName, final String... tags) { |
There was a problem hiding this comment.
If the above comment is valid, then we do not need this overload func here.
| updatedTags[tags.length] = scopeName + "-id"; | ||
| updatedTags[tags.length + 1] = entityName; | ||
| return tagMap(updatedTags); | ||
| return threadLevelTagMap(updatedTags); |
There was a problem hiding this comment.
This makes me thinking: should we clarify in our javadoc that user customized metrics would be recorded at the thread-level specifically, with additional scope tag?
There was a problem hiding this comment.
That would be good idea. Do you want to update the javadocs in KIP-444?
| expiredRecordSensor, | ||
| "stream-" + metricScope + "-metrics", | ||
| metrics.tagMap("task-id", taskName, metricScope + "-id", name()), | ||
| metrics.storeLevelTagMap(taskName, metricScope, name()), |
There was a problem hiding this comment.
Also since the "incorrect" metrics name is out in our previous releases, but fixing it we are effectively breaking user's compatibility. I think this okay since we are fixing a bug, but would still worth clarifying it in the upgrade guide docs.
guozhangwang
left a comment
There was a problem hiding this comment.
One meta comment: in KIP-444 we are changing the type from stream-metrics to stream-thread-metrics as well, is it going to be included in future PRs? I did not see it in either of the open ones yet.
|
All failed, test results cleaned up already retest this please |
cadonna
left a comment
There was a problem hiding this comment.
@guozhangwang Yes, the type is changed from stream-metrics to stream-thread-metrics in the thread-level refactoring thread that I could not open yet since it is based on this one.
| final Map<String, String> tagMap = metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, processorNodeName); | ||
| final Map<String, String> allTagMap = metrics.tagMap("task-id", context.taskId().toString(), PROCESSOR_NODE_ID_TAG, "all"); | ||
| final Map<String, String> tagMap = metrics.nodeLevelTagMap(context.taskId().toString(), processorNodeName); | ||
| final Map<String, String> allTagMap = |
There was a problem hiding this comment.
Good catch! Of course! I even did as you said for the commitOverTask sensor. I totally overlooked this one.
| return tagMap; | ||
| } | ||
|
|
||
| public Map<String, String> taskLevelTagMap(final String taskName, final String... tags) { |
| updatedTags[tags.length] = scopeName + "-id"; | ||
| updatedTags[tags.length + 1] = entityName; | ||
| return tagMap(updatedTags); | ||
| return threadLevelTagMap(updatedTags); |
There was a problem hiding this comment.
That would be good idea. Do you want to update the javadocs in KIP-444?
| expiredRecordSensor, | ||
| "stream-" + metricScope + "-metrics", | ||
| metrics.tagMap("task-id", taskName, metricScope + "-id", name()), | ||
| metrics.storeLevelTagMap(taskName, metricScope, name()), |
|
Java 11/2.12 failed with retest this please |
…s and below - Renamed tag client-id to thread-id for thread-level metrics and below - Corrected metrics tag keys for state store that had suffix "-id" instead of "state-id"
adbcf55 to
6acde48
Compare
|
In JDK 11/Scala 2.13 the following tests failed: Retest this, please |
|
Test failures on |
|
LGTM, merged to trunk (2.5). |
Revert "KAFKA-8964: Rename tag client-id for thread-level metrics and below (apache#7429)"
…t-for-generated-requests * apache-github/trunk: KAFKA-8932; Add tag for CreateTopicsResponse.TopicConfigErrorCode (KIP-525) (apache#7464) KAFKA-8944: Fixed KTable compiler warning. (apache#7393) KAFKA-8964: Rename tag client-id for thread-level metrics and below (apache#7429) MINOR: remove unused imports in Streams system tests (apache#7468) KAFKA-7190; Retain producer state until transactionalIdExpiration time passes (apache#7388) KAFKA-8983; AdminClient deleteRecords should not fail all partitions unnecessarily (apache#7449) MINOR: Modified Exception handling for KIP-470 (apache#7461) KAFKA-7245: Deprecate WindowStore#put(key, value) (apache#7105) KAFKA-8179: Part 7, cooperative rebalancing in Streams (apache#7386) KAFKA-8985; Add flexible version support to inter-broker APIs (apache#7453) MINOR: Bump version to 2.5.0-SNAPSHOT (apache#7455)
of "state-id"
Committer Checklist (excluded from commit message)