Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
object Release {
const val Group = "com.tencent.devops"
const val Version = "0.0.6-SNAPSHOT"
const val Version = "0.0.7-SNAPSHOT"
}

object Versions {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
description = "DevOps Boot Pulsar"

dependencies {
api("org.springframework.cloud:spring-cloud-stream:3.0.11.RELEASE")
api("org.springframework.boot:spring-boot-actuator")
api("org.springframework.boot:spring-boot-actuator-autoconfigure")
api("org.apache.pulsar:pulsar-client:2.8.1")
api("com.google.protobuf:protobuf-java:3.19.4")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.devops.stream.binder.pulsar

import com.tencent.devops.stream.binder.pulsar.integration.inbound.PulsarInboundChannelAdapter
import com.tencent.devops.stream.binder.pulsar.integration.outbound.PulsarProducerMessageHandler
import com.tencent.devops.stream.binder.pulsar.properties.PulsarBinderConfigurationProperties
import com.tencent.devops.stream.binder.pulsar.properties.PulsarConsumerProperties
import com.tencent.devops.stream.binder.pulsar.properties.PulsarExtendedBindingProperties
import com.tencent.devops.stream.binder.pulsar.properties.PulsarProducerProperties
import com.tencent.devops.stream.binder.pulsar.provisioning.PulsarMessageQueueProvisioner
import org.apache.pulsar.client.api.PulsarClient
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties
import org.springframework.cloud.stream.binder.ExtendedProducerProperties
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder
import org.springframework.cloud.stream.provisioning.ConsumerDestination
import org.springframework.cloud.stream.provisioning.ProducerDestination
import org.springframework.integration.core.MessageProducer
import org.springframework.messaging.MessageChannel
import org.springframework.messaging.MessageHandler

class PulsarMessageChannelBinder(
private val pulsarClient: PulsarClient,
messageBinderProvisioner: PulsarMessageQueueProvisioner,
private val extendedBindingProperties: PulsarExtendedBindingProperties,
private val pulsarProperties: PulsarBinderConfigurationProperties
) : AbstractMessageChannelBinder<
ExtendedConsumerProperties<PulsarConsumerProperties>,
ExtendedProducerProperties<PulsarProducerProperties>, PulsarMessageQueueProvisioner
>(
arrayOf(),
messageBinderProvisioner
),
ExtendedPropertiesBinder<MessageChannel, PulsarConsumerProperties, PulsarProducerProperties> {

override fun createProducerMessageHandler(
destination: ProducerDestination?,
producerProperties: ExtendedProducerProperties<PulsarProducerProperties>?,
errorChannel: MessageChannel?
): MessageHandler {
throw IllegalStateException(
"The abstract binder should not call this method"
)
}

override fun createProducerMessageHandler(
destination: ProducerDestination,
producerProperties: ExtendedProducerProperties<PulsarProducerProperties>,
channel: MessageChannel,
errorChannel: MessageChannel?
): MessageHandler {
val messageHandler = PulsarProducerMessageHandler(
destination = destination,
pulsarClient = pulsarClient,
producerProperties = producerProperties.extension,
pulsarProperties = pulsarProperties
)
messageHandler.setApplicationContext(this.applicationContext)
if (errorChannel != null) {
// TODO 需要处理
}
// val partitioningInterceptor = (channel as AbstractMessageChannel)
// .interceptors.stream()
// .filter { channelInterceptor: ChannelInterceptor? -> channelInterceptor is PartitioningInterceptor }
// .map { channelInterceptor: ChannelInterceptor? -> channelInterceptor as PartitioningInterceptor? }
// .findFirst().orElse(null)
// TODO 分区处理
// messageHandler.partitioningInterceptor = partitioningInterceptor
messageHandler.setBeanFactory(applicationContext.beanFactory)
// TODO 错误消息策略
// messageHandler.setErrorMessageStrategy(this.errorMessageStrategy)
return messageHandler
}

override fun createConsumerEndpoint(
destination: ConsumerDestination?,
group: String?,
properties: ExtendedConsumerProperties<PulsarConsumerProperties>
): MessageProducer {

val inboundChannelAdapter = PulsarInboundChannelAdapter(
destination = destination!!.name,
pulsarClient = pulsarClient,
extendedConsumerProperties = properties,
pulsarProperties = pulsarProperties,
group = group
)
val errorInfrastructure = registerErrorInfrastructure(
destination,
group, properties
)
if (properties.maxAttempts > 1) {
inboundChannelAdapter.retryTemplate = buildRetryTemplate(properties)
inboundChannelAdapter.recoveryCallback = errorInfrastructure.recoverer
} else {
inboundChannelAdapter.errorChannel = errorInfrastructure.errorChannel
}
return inboundChannelAdapter
}

// TODO Polled Consumer 定时拉取处理

override fun getExtendedConsumerProperties(channelName: String?): PulsarConsumerProperties {
return extendedBindingProperties.getExtendedConsumerProperties(channelName)
}

override fun getExtendedProducerProperties(channelName: String?): PulsarProducerProperties {
return extendedBindingProperties.getExtendedProducerProperties(channelName)
}

override fun getDefaultsPrefix(): String {
return this.extendedBindingProperties.defaultsPrefix
}

override fun getExtendedPropertiesEntryClass(): Class<out BinderSpecificPropertiesProvider> {
return extendedBindingProperties.extendedPropertiesEntryClass
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.devops.stream.binder.pulsar.actuator

import com.tencent.devops.stream.binder.pulsar.metrics.Instrumentation
import com.tencent.devops.stream.binder.pulsar.metrics.InstrumentationManager
import org.springframework.boot.actuate.health.AbstractHealthIndicator
import org.springframework.boot.actuate.health.Health

class PulsarBinderHealthIndicator : AbstractHealthIndicator() {
@Throws(Exception::class)
override fun doHealthCheck(builder: Health.Builder) {
if (InstrumentationManager.getHealthInstrumentations().stream()
.allMatch(Instrumentation::isUp)
) {
builder.up()
return
}
if (InstrumentationManager.getHealthInstrumentations().stream()
.allMatch(Instrumentation::isOutOfService)
) {
builder.outOfService()
return
}
builder.down()
InstrumentationManager.getHealthInstrumentations().stream()
.filter { instrumentation -> !instrumentation.isStarted() }
.forEach {
builder
.withException(it.failedException)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-CI 蓝鲸持续集成平台 is licensed under the MIT license.
*
* A copy of the MIT License is included in this file.
*
*
* Terms of the MIT License:
* ---------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
* LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
* WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.devops.stream.binder.pulsar.autoconfigurate

import com.tencent.devops.stream.binder.pulsar.convert.PulsarMessageConverter
import com.tencent.devops.stream.binder.pulsar.custom.PulsarConfigBeanPostProcessor
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.context.properties.source.ConfigurationPropertyName
import org.springframework.cloud.stream.config.BindingHandlerAdvise.MappingsProvider
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.messaging.converter.CompositeMessageConverter

@Configuration
class ExtendedBindingHandlerMappingsProviderConfiguration {
@Bean
fun pulsarExtendedPropertiesDefaultMappingsProvider(): MappingsProvider {
return MappingsProvider {
val mappings: MutableMap<ConfigurationPropertyName, ConfigurationPropertyName> = HashMap()
mappings[ConfigurationPropertyName.of("spring.cloud.stream.pulsar.bindings")] =
ConfigurationPropertyName.of("spring.cloud.stream.pulsar.default")
mappings
}
}

@Bean
fun pulsarConfigBeanPostProcessor(): PulsarConfigBeanPostProcessor {
return PulsarConfigBeanPostProcessor()
}

/**
* if you want to customize a bean, please use this BeanName `PulsarMessageConverter.DEFAULT_NAME`.
*/
@Bean(PulsarMessageConverter.DEFAULT_NAME)
@ConditionalOnMissingBean(name = [PulsarMessageConverter.DEFAULT_NAME])
fun pulsarMessageConverter(): CompositeMessageConverter {
return PulsarMessageConverter().getMessageConverter()
}
}
Loading