Skip to content

Commit 9c1fe94

Browse files
mq
1 parent 3060973 commit 9c1fe94

13 files changed

Lines changed: 282 additions & 40 deletions

File tree

42.4 KB
Loading
22.6 KB
Loading
34.4 KB
Loading

文档/RocketMQ/RocketMQ笔记.md

Lines changed: 53 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -557,21 +557,22 @@ rocketmq.config.namesrvAddr=192.168.2.128:9876;192.168.2.129:9876
557557
异步
558558

559559
```java
560-
//2发送异步消息
561-
/*
562-
发送异步消息之后:有2个线程:a.Main线程,发送完毕 立刻执行以后的程序 ;
563-
b.处理消息的线程 ,并在处理完毕后 触发回调函数onSuccess()\onException()
564-
565-
producer.send(message, new SendCallback() {
566-
@Override
567-
public void onSuccess(SendResult sendResult) {
568-
System.out.println("发送成功:"+sendResult);
569-
}
570-
@Override
571-
public void onException(Throwable throwable) {
572-
System.out.println("发送失败,异常:"+ throwable.getMessage());
573-
}
574-
}); */
560+
//2发送异步消息
561+
/*
562+
发送异步消息之后:有2个线程:a.Main线程,发送完毕 立刻执行以后的程序 ;
563+
b.处理消息的线程 ,并在处理完毕后 触发回调函数
564+
onSuccess()\onException()
565+
*/
566+
producer.send(message, new SendCallback() {
567+
@Override
568+
public void onSuccess(SendResult sendResult) {
569+
System.out.println("发送成功:"+sendResult);
570+
}
571+
@Override
572+
public void onException(Throwable throwable) {
573+
System.out.println("发送失败,异常:"+ throwable.getMessage());
574+
}
575+
});
575576
```
576577

577578

@@ -593,25 +594,60 @@ rocketmq.config.namesrvAddr=192.168.2.128:9876;192.168.2.129:9876
593594

594595

595596

596-
## 消费模式
597+
## Push Consumer消费模式
597598

598-
默认集群模式:
599+
### 默认集群模式:
599600

600601
```
601602
consumer.setMessageModel(MessageModel.CLUSTERING);
602603
```
603604

604605
搭建消费者集群:只需要将groupName设置相同即可
605606

607+
![1568959735585](RocketMQ笔记.assets/1568959735585.png)
608+
609+
延迟:
610+
611+
不支持任意的时间精度,只支持几下几种
606612

613+
默认配置,/usr/rocketmq/conf/broker.conf(默认配置)
607614

608615
messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
609616

617+
定义:生产者立刻将消息发送到队列, 在队列中停留一些时间后 发送给消费者
618+
619+
![1568961292364](RocketMQ笔记.assets/1568961292364.png)
620+
621+
```java
622+
message.setDelayTimeLevel(3);
623+
```
624+
625+
### 广播模式
610626

627+
特点:最大的不同,将全部的消息内容,给每个消费者各一份(每个消费者 拥有一套完整的 消息数据)
611628

629+
不需groupName相同
630+
631+
设置成广播模式:消费者
632+
633+
```java
634+
//设置成广播模式
635+
consumer.setMessageModel(MessageModel.BROADCASTING) ;
636+
```
637+
638+
639+
640+
设置订阅标签
641+
642+
```java
643+
consumer.subscribe("mytopic1","tag1");
644+
consumer.subscribe("mytopic1","*");
645+
consumer.subscribe("mytopic1","tag1||tag2");
646+
```
612647

613648

614649

650+
###
615651

616652

617653

-10.6 KB
Loading
-17.5 KB
Binary file not shown.

源码/MyRocketMQ/src/main/java/com/yanqun/comsumer/MyConsumer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,11 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeCo
3939
String tags = messageExt.getTags();
4040
String keys = messageExt.getKeys();
4141
try {
42+
// if(tags.equals("mytag11")){
43+
if("mytag11".equals(tags)){
4244
String body = new String( messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
43-
System.out.println("消费:"+topic+ ",\t"+tags+ ",\t"+keys+ ",\t"+body);
45+
System.out.println("消费:"+topic+ ",\t"+tags+ ",\t"+keys+ ",\t"+body );
46+
}
4447
} catch (UnsupportedEncodingException e) {
4548
e.printStackTrace();
4649
//获取重试次数

源码/MyRocketMQ/src/main/java/com/yanqun/comsumer/MyConsumer2.java

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public static void main(String[] args) {
2121

2222
consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET );
2323
try {
24-
consumer.subscribe("mytopic1","*");
24+
consumer.subscribe("mytopic1","mytag11||mytag12");
2525
// consumer.setConsumeMessageBatchMaxSize(100 ); 消费者一次性消费多少 消息数量。可以防止 消费者因为自身性能不足 造成的宕机的情况,按需从MQ中拉取数据
2626
// consumer.setConsumeThreadMax(4); 设置消费者 并发的线程数量
2727
// consumer.setConsumeThreadMin(2);
@@ -34,31 +34,15 @@ public static void main(String[] args) {
3434
consumer.registerMessageListener(new MessageListenerConcurrently() {
3535
@Override
3636
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
37-
MessageExt messageExt = list.get(0) ;
38-
String topic = messageExt.getTopic();
39-
String tags = messageExt.getTags();
40-
String keys = messageExt.getKeys();
41-
try {
42-
String body = new String( messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
43-
System.out.println("消费:"+topic+ ",\t"+tags+ ",\t"+keys+ ",\t"+body);
44-
} catch (UnsupportedEncodingException e) {
45-
e.printStackTrace();
46-
//获取重试次数
47-
int times = messageExt.getReconsumeTimes() ;
48-
if(times == 3){
49-
//补偿处理: 记录日志 Log....
50-
System.out.println("消息发送失败..."+messageExt);
51-
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS ;//放行:让消费者 消费下一个message
52-
53-
}
54-
return ConsumeConcurrentlyStatus.RECONSUME_LATER ;//过一段时间后 重试
37+
for (MessageExt messageExt : list) {
38+
// System.out.println(messageExt.getMsgId());
39+
System.out.println("消费:"+messageExt.getTopic()+ ",\t"+messageExt.getTags()+ ",\t"+messageExt.getKeys()+ ",\t"+messageExt.getBody() );
5540

5641
}
5742

58-
59-
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//此条消息消费成功,继续下一个...
43+
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
6044
}
61-
});
45+
});
6246

6347
consumer.start();
6448
System.out.println("consumer2..start...");
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.yanqun.comsumer;
2+
3+
import com.yanqun.api.CONST;
4+
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
5+
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
6+
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
7+
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
8+
import org.apache.rocketmq.client.exception.MQClientException;
9+
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
10+
import org.apache.rocketmq.common.message.MessageExt;
11+
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
12+
import org.apache.rocketmq.remoting.common.RemotingHelper;
13+
14+
import java.io.UnsupportedEncodingException;
15+
import java.util.List;
16+
17+
public class MyConsumerBroadCast1 {
18+
public static void main(String[] args) {
19+
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
20+
consumer.setNamesrvAddr(CONST.NAMESERVER_ADDR);
21+
//设置成广播模式
22+
consumer.setMessageModel(MessageModel.BROADCASTING) ;
23+
consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET );
24+
try {
25+
consumer.subscribe("mytopic1","*");
26+
// consumer.setConsumeMessageBatchMaxSize(100 ); 消费者一次性消费多少 消息数量。可以防止 消费者因为自身性能不足 造成的宕机的情况,按需从MQ中拉取数据
27+
// consumer.setConsumeThreadMax(4); 设置消费者 并发的线程数量
28+
// consumer.setConsumeThreadMin(2);
29+
30+
// consumer.setConsumeTimeout();//设置消费的超时时间
31+
//如果消费失败(超时等),可以设置重试次数,并且同时修改代码(补偿处理)
32+
consumer.setMaxReconsumeTimes(3);
33+
34+
//设置监听器:当生产者生产数据时,将数据推送给 消费者
35+
consumer.registerMessageListener(new MessageListenerConcurrently() {
36+
@Override
37+
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
38+
MessageExt messageExt = list.get(0) ;
39+
String topic = messageExt.getTopic();
40+
String tags = messageExt.getTags();
41+
String keys = messageExt.getKeys();
42+
try {
43+
String body = new String( messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
44+
System.out.println("消费:"+topic+ ",\t"+tags+ ",\t"+keys+ ",\t"+body );
45+
} catch (UnsupportedEncodingException e) {
46+
e.printStackTrace();
47+
//获取重试次数
48+
int times = messageExt.getReconsumeTimes() ;
49+
if(times == 3){
50+
//补偿处理: 记录日志 Log....
51+
System.out.println("消息发送失败..."+messageExt);
52+
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS ;//放行:让消费者 消费下一个message
53+
54+
}
55+
return ConsumeConcurrentlyStatus.RECONSUME_LATER ;//过一段时间后 重试
56+
57+
}
58+
59+
60+
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//此条消息消费成功,继续下一个...
61+
}
62+
});
63+
64+
consumer.start();
65+
System.out.println("consumer1..start...");
66+
67+
} catch (MQClientException e) {
68+
e.printStackTrace();
69+
}
70+
71+
}
72+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.yanqun.comsumer;
2+
3+
import com.yanqun.api.CONST;
4+
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
5+
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
6+
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
7+
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
8+
import org.apache.rocketmq.client.exception.MQClientException;
9+
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
10+
import org.apache.rocketmq.common.message.MessageExt;
11+
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
12+
import org.apache.rocketmq.remoting.common.RemotingHelper;
13+
14+
import java.io.UnsupportedEncodingException;
15+
import java.util.List;
16+
17+
public class MyConsumerBroadCast2 {
18+
public static void main(String[] args) {
19+
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group2");
20+
consumer.setNamesrvAddr(CONST.NAMESERVER_ADDR);
21+
//设置成广播模式
22+
consumer.setMessageModel(MessageModel.BROADCASTING) ;
23+
consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET );
24+
try {
25+
consumer.subscribe("mytopic1","*");
26+
// consumer.setConsumeMessageBatchMaxSize(100 ); 消费者一次性消费多少 消息数量。可以防止 消费者因为自身性能不足 造成的宕机的情况,按需从MQ中拉取数据
27+
// consumer.setConsumeThreadMax(4); 设置消费者 并发的线程数量
28+
// consumer.setConsumeThreadMin(2);
29+
30+
// consumer.setConsumeTimeout();//设置消费的超时时间
31+
//如果消费失败(超时等),可以设置重试次数,并且同时修改代码(补偿处理)
32+
consumer.setMaxReconsumeTimes(3);
33+
34+
//设置监听器:当生产者生产数据时,将数据推送给 消费者
35+
consumer.registerMessageListener(new MessageListenerConcurrently() {
36+
@Override
37+
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
38+
MessageExt messageExt = list.get(0) ;
39+
String topic = messageExt.getTopic();
40+
String tags = messageExt.getTags();
41+
String keys = messageExt.getKeys();
42+
try {
43+
String body = new String( messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
44+
System.out.println("消费:"+topic+ ",\t"+tags+ ",\t"+keys+ ",\t"+body );
45+
} catch (UnsupportedEncodingException e) {
46+
e.printStackTrace();
47+
//获取重试次数
48+
int times = messageExt.getReconsumeTimes() ;
49+
if(times == 3){
50+
//补偿处理: 记录日志 Log....
51+
System.out.println("消息发送失败..."+messageExt);
52+
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS ;//放行:让消费者 消费下一个message
53+
54+
}
55+
return ConsumeConcurrentlyStatus.RECONSUME_LATER ;//过一段时间后 重试
56+
57+
}
58+
59+
60+
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//此条消息消费成功,继续下一个...
61+
}
62+
});
63+
64+
consumer.start();
65+
System.out.println("consumer2..start...");
66+
67+
} catch (MQClientException e) {
68+
e.printStackTrace();
69+
}
70+
71+
}
72+
}

0 commit comments

Comments
 (0)