Skip to content

Commit 96fd52a

Browse files
committed
rabbitmq 发送端和消费端实例代码
1 parent 5b9b23e commit 96fd52a

4 files changed

Lines changed: 92 additions & 2 deletions

File tree

pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,14 @@
3838

3939
<dependencies>
4040

41+
<!-- rabbit mq -->
42+
<dependency>
43+
<groupId>com.rabbitmq</groupId>
44+
<artifactId>amqp-client</artifactId>
45+
<version>5.1.2</version>
46+
</dependency>
47+
<!-- rabbit mq end -->
48+
4149
<dependency>
4250
<groupId>org.jsoup</groupId>
4351
<artifactId>jsoup</artifactId>

src/main/java/com/code/repository/study/http/HscodeGatherer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ public class HscodeGatherer implements Callable<List<Map<String,String>>> {
3838
this.hscode4 = hscode4;
3939
}
4040

41-
private static int corePoolSize = 20; // 主流线程个数
42-
private static int maximumPoolSize = 50; // 线程最大个数
41+
private static int corePoolSize = 50; // 主流线程个数
42+
private static int maximumPoolSize = 100; // 线程最大个数
4343
private static long keepAliveTime = 1000L; // 大于主流线程个数的线程空闲的过期时间 wait for new tasks before terminating
4444
private static TimeUnit unit = TimeUnit.MILLISECONDS; // 时间单元
4545
private static BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>(); // 工作队列,有三种类SynchronousQueue、LinkedBlockingQueue(在所有 corePoolSize 线程都忙时新任务在队列中等待,maximumPoolSiz失效)、ArrayBlockingQueue,分别对应同步队列、无界队列、有界队列。
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package com.code.repository.study.rabbitmq;
2+
3+
import com.rabbitmq.client.*;
4+
5+
import java.io.IOException;
6+
import java.util.concurrent.TimeoutException;
7+
8+
/**
9+
* rabbit MQ 接收端
10+
*/
11+
public class ReciverMQ {
12+
13+
private final static String QUEUE_NAME = "Hello";
14+
15+
public static void main(String[] args) throws IOException, TimeoutException {
16+
17+
ConnectionFactory factory = new ConnectionFactory();
18+
factory.setHost("localhost");
19+
factory.setPort(5672);
20+
factory.setUsername("guest");
21+
factory.setPassword("guest");
22+
Connection connection = factory.newConnection();
23+
Channel channel = connection.createChannel();
24+
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
25+
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
26+
Consumer consumer = new DefaultConsumer(channel) {
27+
@Override
28+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
29+
String message = new String(body, "UTF-8");
30+
System.out.println(" [x] Received '" + message + "'");
31+
}
32+
};
33+
channel.basicConsume(QUEUE_NAME, true, consumer);
34+
35+
}
36+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.code.repository.study.rabbitmq;
2+
3+
import com.rabbitmq.client.Channel;
4+
import com.rabbitmq.client.Connection;
5+
import com.rabbitmq.client.ConnectionFactory;
6+
7+
import java.io.IOException;
8+
import java.util.Scanner;
9+
import java.util.concurrent.TimeoutException;
10+
11+
/**
12+
* rabbit MQ 发送端
13+
*/
14+
public class SenderMQ {
15+
16+
private final static String QUEUE_NAME = "Hello";
17+
18+
public static void main(String[] args) throws IOException, TimeoutException {
19+
// connection是socket连接的抽象,并且为我们管理协议版本协商(protocol version negotiation),
20+
// 认证(authentication )等等事情。这里我们要连接的消息代理在本地,因此我们将host设为“localhost”。
21+
// 如果我们想连接其他机器上的代理,只需要将这里改为特定的主机名或IP地址。
22+
ConnectionFactory factory = new ConnectionFactory();
23+
factory.setHost("localhost");
24+
factory.setPort(5672); //默认端口号
25+
factory.setUsername("guest");//默认用户名
26+
factory.setPassword("guest");//默认密码
27+
Connection connection = factory.newConnection();
28+
Channel channel = connection.createChannel();
29+
// 接下来,我们创建一个channel,绝大部分API方法需要通过调用它来完成。
30+
// 发送之前,我们必须声明消息要发往哪个队列,然后我们可以向队列发一条消息:
31+
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
32+
33+
Scanner input=new Scanner(System.in);
34+
35+
while(input.hasNextLine()){
36+
String message = input.nextLine();
37+
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
38+
System.out.println(" [x] Sent '" + message + "'");
39+
}
40+
41+
System.out.println("send is end!");
42+
43+
channel.close();
44+
connection.close();
45+
}
46+
}

0 commit comments

Comments
 (0)