11``` text
2- Kafka是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,
3- 并使您能够将消息从一个端点传递到另一个端点;
4- 1.Kafka的几个基本术语:
5- Topics(主题): 属于特定类别的消息流称为主题;数据存储在主题中;
6- Partition(分区): 每个主题可能有多个分区;
7- Partition offset(分区偏移): 分区上每条记录的唯一序列标识;
8- Replicas of partition(分区备份): 分区的备份,从不读取或写入数据;
9- Brokers(经纪人): kafka集群中每个服务称为broker;
10- Producers(生产者): 发送给一个或多个Kafka主题的消息的发布者;
11- Consumers(消费者): 订阅一个或多个主题,并从broker提取已发布的消息来使用;
12- Leader(领导者): 负责给定分区的所有读取和写入的节点;每个分区都有一个服务器充当Leader;
13- Follower(追随者): 同步Leader的partition消息;
14- Consumer Group(消费者组): Topic消息分配到消费者组,再由消费者组分配到具体消费实例;
15- (每个分区最多只能绑定一个消费者,每个消费者可以消费多个分区)
16- 2.Kafka安装使用:
17- Kafka下载地址:http://kafka.apache.org/downloads,选择Binary downloads下载,然后解压即可;
18- Kafka的配置文件位于config目录下(包含kafka和Zookeeper的配置文件),打开server.properties,
19- 将broker.id的值修改为1,每个broker的id都必须设置为Integer类型,且不能重复;
20- [1]启动Zookeeper:
21- (1)Windows下,在cmd中切换到Kafka根目录,执行启动脚本:
2+ Kafka是一个分布式发布-订阅消息系统和一个强大的队列,可以处理大量的数据,
3+ 并使您能够将消息从一个端点传递到另一个端点;
4+ 1.Kafka的几个基本术语:
5+ Topics(主题): 属于特定类别的消息流称为主题;数据存储在主题中;
6+ Partition(分区): 每个主题可能有多个分区;
7+ Partition offset(分区偏移): 分区上每条记录的唯一序列标识;
8+ Replicas of partition(分区备份): 分区的备份,从不读取或写入数据;
9+ Brokers(经纪人): kafka集群中每个服务称为broker;
10+ Producers(生产者): 发送给一个或多个Kafka主题的消息的发布者;
11+ Consumers(消费者): 订阅一个或多个主题,并从broker提取已发布的消息来使用;
12+ Leader(领导者): 负责给定分区的所有读取和写入的节点;每个分区都有一个服务器充当Leader;
13+ Follower(追随者): 同步Leader的partition消息;
14+ Consumer Group(消费者组): Topic消息分配到消费者组,再由消费者组分配到具体消费实例;
15+ (每个分区最多只能绑定一个消费者,每个消费者可以消费多个分区)
16+ 2.Kafka安装使用:
17+ Kafka下载地址:http://kafka.apache.org/downloads,选择Binary downloads下载,然后解压即可;
18+ Kafka的配置文件位于config目录下(包含kafka和Zookeeper的配置文件),打开server.properties,
19+ 将broker.id的值修改为1,每个broker的id都必须设置为Integer类型,且不能重复;
20+ [1]启动Zookeeper:
21+ (1)Windows下,在cmd中切换到Kafka根目录,执行启动脚本:
2222 bin\windows\zookeeper-server-start.bat config\zookeeper.properties
23- (2)Linux下,在终端命令行切换到Kafka根目录,以后台进程的方式执行启动脚本:
23+ (2)Linux下,在终端命令行切换到Kafka根目录,以后台进程的方式执行启动脚本:
2424 bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
25- [2]启动Kafka:
26- (1)Windows下,在cmd中切换到Kafka根目录,执行启动脚本:
25+ [2]启动Kafka:
26+ (1)Windows下,在cmd中切换到Kafka根目录,执行启动脚本:
2727 bin\windows\kafka-server-start.bat config\server.properties
28- (2)Linux下,在终端命令行切换到Kafka根目录,执行启动脚本:
28+ (2)Linux下,在终端命令行切换到Kafka根目录,执行启动脚本:
2929 bin/kafka-server-start.sh config/server.properties
30- 当看到命令行打印started等信息,说明启动完毕;
31- [3]创建Topic:
32- (1)Windows下,在cmd中切换到Kafka根目录,执行创建Topic脚本:
30+ 当看到命令行打印started等信息,说明启动完毕;
31+ [3]创建Topic:
32+ (1)Windows下,在cmd中切换到Kafka根目录,执行创建Topic脚本:
3333 bin\windows\kafka-topics.bat --create --zookeeper localhost:2181
3434 --replication-factor 1 --partitions 1 --topic test
35- (创建一个Topic到ZK(指定ZK的地址),副本个数为1,分区数为1,Topic的名称为test)
36- (2)Linux下,在终端命令行切换到Kafka根目录,执行创建Topic脚本:
35+ (创建一个Topic到ZK(指定ZK的地址),副本个数为1,分区数为1,Topic的名称为test)
36+ (2)Linux下,在终端命令行切换到Kafka根目录,执行创建Topic脚本:
3737 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
3838 --partitions 1 --topic test
39- [4]查看Kafka里的Topic列表:
40- (1)Windows下,在cmd中切换到Kafka根目录,执行Topic脚本:
39+ [4]查看Kafka里的Topic列表:
40+ (1)Windows下,在cmd中切换到Kafka根目录,执行Topic脚本:
4141 bin\windows\kafka-topics.bat --list --zookeeper localhost:2181
42- (2)Linux下,在终端命令行切换到Kafka根目录,执行Topic脚本:
42+ (2)Linux下,在终端命令行切换到Kafka根目录,执行Topic脚本:
4343 bin/kafka-topics.sh --list --zookeeper localhost:2181
44- [5]查看某个Topic的具体信息: (如:test)
45- (1)Windows下,在cmd中切换到Kafka根目录,执行Topic脚本:
44+ [5]查看某个Topic的具体信息: (如:test)
45+ (1)Windows下,在cmd中切换到Kafka根目录,执行Topic脚本:
4646 bin\windows\kafka-topics.bat --describe --zookeeper localhost:2181 --topic test
47- (2)Linux下,在终端命令行切换到Kafka根目录,执行Topic脚本:
47+ (2)Linux下,在终端命令行切换到Kafka根目录,执行Topic脚本:
4848 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
49- [6]启动Producers:
50- (1)Windows下,在cmd中切换到Kafka根目录,执行producer脚本:
49+ [6]启动Producers:
50+ (1)Windows下,在cmd中切换到Kafka根目录,执行producer脚本:
5151 bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
52- (9092为生产者的默认端口号,启动生产者后,可往test Topic里发送数据)
53- (2)Linux下,在终端命令行切换到Kafka根目录,执行producer脚本:
52+ (9092为生产者的默认端口号,启动生产者后,可往test Topic里发送数据)
53+ (2)Linux下,在终端命令行切换到Kafka根目录,执行producer脚本:
5454 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
55- [7]启动Consumers:
56- (1)Windows下,在cmd中切换到Kafka根目录,执行consumer脚本:
55+ [7]启动Consumers:
56+ (1)Windows下,在cmd中切换到Kafka根目录,执行consumer脚本:
5757 bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092
5858 --topic test --from-beginning
59- (from-beginning表示从头开始读取数据)
60- (2)Linux下,在终端命令行切换到Kafka根目录,执行consumer脚本:
59+ (from-beginning表示从头开始读取数据)
60+ (2)Linux下,在终端命令行切换到Kafka根目录,执行consumer脚本:
6161 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
6262 --topic test --from-beginning
63- 3.Spring Boot整合Kafaka:
64- [1]引入web依赖和kafka依赖:
63+ 3.Spring Boot整合Kafaka:
64+ [1]引入web依赖和kafka依赖:
6565 <dependency>
6666 <groupId>org.springframework.boot</groupId>
6767 <artifactId>spring-boot-starter-web</artifactId>
7070 <groupId>org.springframework.kafka</groupId>
7171 <artifactId>spring-kafka</artifactId>
7272 </dependency>
73- [2]配置生产者:
74- (1)通过配置类,配置生产者工厂及kafka模板:
73+ [2]配置生产者:
74+ (1)通过配置类,配置生产者工厂及kafka模板:
7575 @Configuration
7676 public class KafkaProducerConfig {
7777 @Value("${spring.kafka.bootstrap-servers}")
@@ -83,37 +83,37 @@ Kafka
8383 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
8484 bootstrapServers);
8585 configProps.put(
86- //key的序列化策略,String类型
86+ //key的序列化策略,String类型
8787 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
8888 StringSerializer.class);
8989 configProps.put(
90- //value的序列化策略,String类型
90+ //value的序列化策略,String类型
9191 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
9292 StringSerializer.class);
9393 return new DefaultKafkaProducerFactory<>(configProps);
9494 }
95- //其包含了发送消息的便捷方法
95+ //其包含了发送消息的便捷方法
9696 @Bean
9797 public KafkaTemplate<String, String> kafkaTemplate() {
9898 return new KafkaTemplate<>(producerFactory());
9999 }
100100 }
101- (2)配置文件application.yml中配置生产者的地址:
101+ (2)配置文件application.yml中配置生产者的地址:
102102 spring:
103103 kafka:
104104 bootstrap-servers: localhost:9092
105- [3]编写发送消息的controller:
105+ [3]编写发送消息的controller:
106106 @RestController
107107 public class SendMessageController {
108108 @Autowired
109109 private KafkaTemplate<String, String> kafkaTemplate;
110110 @GetMapping("send/{message}")
111111 public void send(@PathVariable String message) {
112- // test为Topic的名称,message为要发送的消息
112+ // test为Topic的名称,message为要发送的消息
113113 this.kafkaTemplate.send("test", message);
114114 }
115115 }
116- send方法是异步方法,可通过回调的方式来确定消息是否发送成功,改造controller:
116+ send方法是异步方法,可通过回调的方式来确定消息是否发送成功,改造controller:
117117 @RestController
118118 public class SendMessageController {
119119 private Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -126,19 +126,19 @@ Kafka
126126 future.addCallback(new ListenableFutureCallback<SendResult<String,String>>(){
127127 @Override
128128 public void onSuccess(SendResult<String, String> result) {
129- logger.info("成功发送消息:{},offset=[{}]", message,
129+ logger.info("成功发送消息:{},offset=[{}]", message,
130130 result.getRecordMetadata().offset());
131131 }
132132 @Override
133133 public void onFailure(Throwable ex) {
134- logger.error("消息:{} 发送失败,原因:{}", message, ex.getMessage());
134+ logger.error("消息:{} 发送失败,原因:{}", message, ex.getMessage());
135135 }
136136 });
137137 }
138138 }
139- [4]配置消费者:
140- (1)通过配置类,配置消费者工厂和监听容器工厂:
141- //配置类上需要@EnableKafka注释才能在Spring托管Bean上检测@KafkaListener注解
139+ [4]配置消费者:
140+ (1)通过配置类,配置消费者工厂和监听容器工厂:
141+ //配置类上需要@EnableKafka注释才能在Spring托管Bean上检测@KafkaListener注解
142142 @EnableKafka
143143 @Configuration
144144 public class KafkaConsumerConfig {
@@ -177,40 +177,40 @@ Kafka
177177 return factory;
178178 }
179179 }
180- (2)在application.yml里配置消费者组ID和消息读取策略:
180+ (2)在application.yml里配置消费者组ID和消息读取策略:
181181 spring:
182182 kafka:
183183 consumer:
184184 group-id: test-consumer
185185 auto-offset-reset: latest
186186
187- 消息读取策略,包含四个可选值:
188- earliest:当各分区下有已提交的offset时,从提交的offset开始消费;
189- 无提交的offset时,从头开始消费;
190- latest:当各分区下有已提交的offset时,从提交的offset开始消费;
191- 无提交的offset时,消费新产生的该分区下的数据;
192- none:topic各分区都存在已提交的offset时,从offset后开始消费;
193- 只要有一个分区不存在已提交的offset,则抛出异常;
194- exception:直接抛出异常;
195- [5]编写消息监听器类:
187+ 消息读取策略,包含四个可选值:
188+ earliest:当各分区下有已提交的offset时,从提交的offset开始消费;
189+ 无提交的offset时,从头开始消费;
190+ latest:当各分区下有已提交的offset时,从提交的offset开始消费;
191+ 无提交的offset时,消费新产生的该分区下的数据;
192+ none:topic各分区都存在已提交的offset时,从offset后开始消费;
193+ 只要有一个分区不存在已提交的offset,则抛出异常;
194+ exception:直接抛出异常;
195+ [5]编写消息监听器类:
196196 @Component
197197 public class KafkaMessageListener {
198198 private Logger logger = LoggerFactory.getLogger(this.getClass());
199- // 指定监听的主题和消费者组
199+ // 指定监听的主题和消费者组
200200 @KafkaListener(topics = "test", groupId = "test-consumer")
201201 public void listen(String message) {
202- logger.info("接收消息: {}", message);
202+ logger.info("接收消息: {}", message);
203203 }
204204 }
205- 4.@KafkaListener详解:
206- [1]同时监听来自多个Topic的消息: @KafkaListener(topics = "topic1, topic2")
207- [2]@Header注解获取当前消息来自哪个分区:
205+ 4.@KafkaListener详解:
206+ [1]同时监听来自多个Topic的消息: @KafkaListener(topics = "topic1, topic2")
207+ [2]@Header注解获取当前消息来自哪个分区:
208208 @KafkaListener(topics = "test", groupId = "test-consumer")
209209 public void listen(@Payload String message,
210210 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
211- logger.info("接收消息: {},partition:{}", message, partition);
211+ logger.info("接收消息: {},partition:{}", message, partition);
212212 }
213- [3]指定只接收来自特定分区的消息:
213+ [3]指定只接收来自特定分区的消息:
214214 @KafkaListener(
215215 groupId = "test-consumer",
216216 topicPartitions = @TopicPartition(
@@ -222,28 +222,28 @@ Kafka
222222 )
223223 public void listen(@Payload String message,
224224 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
225- logger.info("接收消息: {},partition:{}", message, partition);
225+ logger.info("接收消息: {},partition:{}", message, partition);
226226 }
227- 如果不需要指定initialOffset,上面代码可以简化为:
227+ 如果不需要指定initialOffset,上面代码可以简化为:
228228 @KafkaListener(groupId = "test-consumer",
229229 topicPartitions = @TopicPartition(topic = "test", partitions = { "0", "1" }))
230- 5.为消息监听添加消息过滤器: setRecordFilterStrategy(RecordFilterStrategy<K, V> strategy)
230+ 5.为消息监听添加消息过滤器: setRecordFilterStrategy(RecordFilterStrategy<K, V> strategy)
231231 @Bean
232232 public ConcurrentKafkaListenerContainerFactory<String, String>
233233 kafkaListenerContainerFactory() {
234234 ConcurrentKafkaListenerContainerFactory<String, String> factory
235235 = new ConcurrentKafkaListenerContainerFactory<>();
236236 factory.setConsumerFactory(consumerFactory());
237- // 添加过滤配置
237+ // 添加过滤配置
238238 factory.setRecordFilterStrategy( r -> r.value().contains("fuck"));
239239 return factory;
240240 }
241- // RecordFilterStrategy接口: (是函数式接口)
241+ // RecordFilterStrategy接口: (是函数式接口)
242242 public interface RecordFilterStrategy<K, V> {
243243 boolean filter(ConsumerRecord<K, V> var1);
244244 }
245- 6.发送复杂的消息: (通过自定义消息转换器来发送复杂的消息)
246- [1]定义消息实体:
245+ 6.发送复杂的消息: (通过自定义消息转换器来发送复杂的消息)
246+ [1]定义消息实体:
247247 public class Message implements Serializable {
248248 private static final long serialVersionUID = 6678420965611108427L;
249249 private String from;
@@ -260,14 +260,14 @@ Kafka
260260 ", message='" + message + '\'' +
261261 '}';
262262 }
263- // get set 略
263+ // get set 略
264264 }
265- [2]改造消息生产者配置:
265+ [2]改造消息生产者配置:
266266 @Configuration
267267 public class KafkaProducerConfig {
268268 @Value("${spring.kafka.bootstrap-servers}")
269269 private String bootstrapServers;
270- // 返回类型为ProducerFactory<String,Message>
270+ // 返回类型为ProducerFactory<String,Message>
271271 @Bean
272272 public ProducerFactory<String, Message> producerFactory() {
273273 Map<String, Object> configProps = new HashMap<>();
@@ -278,18 +278,18 @@ Kafka
278278 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
279279 StringSerializer.class);
280280 configProps.put(
281- //将value序列化策略指定为了Kafka提供的JsonSerializer
281+ //将value序列化策略指定为了Kafka提供的JsonSerializer
282282 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
283283 JsonSerializer.class);
284284 return new DefaultKafkaProducerFactory<>(configProps);
285285 }
286- // 返回类型为KafkaTemplate<String, Message>
286+ // 返回类型为KafkaTemplate<String, Message>
287287 @Bean
288288 public KafkaTemplate<String, Message> kafkaTemplate() {
289289 return new KafkaTemplate<>(producerFactory());
290290 }
291291 }
292- [3]在controller中发送复杂消息:
292+ [3]在controller中发送复杂消息:
293293 @RestController
294294 public class SendMessageController {
295295 private Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -300,7 +300,7 @@ Kafka
300300 this.kafkaTemplate.send("test", new Message("kimi", message));
301301 }
302302 }
303- [4]修改消费者配置:
303+ [4]修改消费者配置:
304304 @EnableKafka
305305 @Configuration
306306 public class KafkaConsumerConfig {
@@ -336,17 +336,17 @@ Kafka
336336 return factory;
337337 }
338338 }
339- [5]修改消息监听:
339+ [5]修改消息监听:
340340 @Component
341341 public class KafkaMessageListener {
342342 private Logger logger = LoggerFactory.getLogger(this.getClass());
343- // 指定监听的主题和消费者组
343+ // 指定监听的主题和消费者组
344344 @KafkaListener(topics = "test", groupId = "test-consumer")
345345 public void listen(Message message) {
346- logger.info("接收消息: {}", message);
346+ logger.info("接收消息: {}", message);
347347 }
348348 }
349- 7.更多配置:
349+ 7.更多配置:
350350(https://docs.spring.io/spring-boot/docs/2.1.1.RELEASE/reference/htmlsingle/#common-application-properties)
351351 # APACHE KAFKA (KafkaProperties)
352352 spring.kafka.admin.client-id=
@@ -443,4 +443,4 @@ Kafka
443443 spring.kafka.streams.ssl.trust-store-type=
444444 spring.kafka.streams.state-dir=
445445 spring.kafka.template.default-topic=
446- ```
446+ ```
0 commit comments