Table of Contents generated with DocToc (opens new window)
- Hello World----简单模式
- Work queues----工作队列,任务队列
- Publish/Subscribe----发布订阅模式
- Routing----路由模式
- Topics----主题模式
- Publisher Confirms----发布确认模式
# Hello World----简单模式
# 依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.biu</groupId>
<artifactId>rabbitmq-hello</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.2</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
</dependencies>
</project>
# 消息生产者
package com.atguigu.rabbiitmq.one;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Author longbiu
* @Class_name Producer
* @Description 生产者发消息
* @Date 2022/4/22 01:53
**/
public class Producer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip_address");
// AMQP的端口是5672,需要处理一下防火墙
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("password");
// 启动报错释放,连接超时的问题
factory.setHandshakeTimeout(60000);
try (
Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
/**
* 生成一个队列
* 1. 队列名称
* 2. 队列中的消息是否持久化,默认把消息存储在内存中
* 3. 该队列是否只供一个消费者进行消费,是否进行共享
* 4. 是否自动删除
* 5. 其他参数
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello world";
/**
* 发送一个消息
* 1. 发送至哪个交换机
* 2. 路由的key
* 3. 其他的参数消息
* 4. 发送消息的消息体
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" 消息发送完毕");
}
}
}
# 消息的消费者
package com.atguigu.rabbiitmq.one;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Class_name Consumer
* @Description 消费者接收消息
* @Author longbiu
* @Date 2022/4/22 11:46
**/
public class Consumer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip_address");
// AMQP的端口是5672,需要处理一下防火墙
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("password");
// 启动报错释放,连接超时的问题
factory.setHandshakeTimeout(60000);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println(" 等待接收消息……");
// 推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback = (consumerTag, deliivery) -> {
String message = new String(deliivery.getBody());
System.out.println(message);
};
// 取消费的一个回调接口,比如消费的时候,队列被删除了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1. 消费哪个队列
* 2. 消费成功后,是否要自动应答
* 3. 消费未成功后的回调
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
# Work queues----工作队列,任务队列
主要思想是避免立即执行资源密集型任务。
把任务封装为消息,并将其发送到队列。
一个消息只能被处理一次,不可以处理多次
# 抽取工具类
package com.atguigu.rabbitmq.rabbitmq_02.utils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Class_name RabbitMqUtils
* @Description 工具类
* @Author longbiu
* @Date 2022/4/22 14:09
**/
public class RabbitMqUtils {
public static Channel getChannel() throws Exception {
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("ip_address");
// AMQP的端口是5672,需要处理一下防火墙
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("password");
// 启动报错释放,连接超时的问题
factory.setHandshakeTimeout(60000);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
# 工作线程代码
package com.atguigu.rabbitmq.rabbitmq_02;
import com.atguigu.rabbitmq.rabbitmq_02.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
/**
* @Class_name Work01
* @Description 工作线程
* @Author longbiu
* @Date 2022/4/22 14:25
**/
public class Work01 {
// 队列名称
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到的消息:" + new String(message.getBody()));
};
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println(consumerTag + "消息被消费者取消消费接口回调逻辑");
};
// 消息的接收
System.out.println("C3正在等待接收消息……");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
# 生产者代码
package com.atguigu.rabbitmq.rabbitmq_02;
import com.atguigu.rabbitmq.rabbitmq_02.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
/**
* @Class_name Task01
* @Description 生产者,启动一个发送线程
* @Author longbiu
* @Date 2022/4/22 19:01
**/
public class Task01 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成" + message);
}
}
}
# 消息应答
为了保证消息在发送过程中不丢失,rabbitmq引入消息应答机制
消费者在收到消息并且处理该消息之后,告诉rabbitmq,它已经处理了,rabbitmq可以把该消息删除了。
# 自动应答
没有对传递的消息进行限制,可能会使得消费者接收过多的,来不及处理的消息,导致消息积压,使得内存耗尽,最终,这些消费者线程被操作系统杀死。
所以,自动应答模式不太靠谱,它需要消费者能高效稳定的消费。
# 消息应答的方法
- Channel.basicAck ---- 用于肯定确认,rabbitmq已经知道该消息成功处理,可以丢弃
- Channel.basicNack ---- 用于否定确认
- Channel.basicReject ---- 用于拒绝,直接丢弃该消息
# Multiple的使用
channel.basicAck(deliveryTag,true);
channel上的,该消息之前入队但还没处理的消息,都会被确认收到消息应答(true)
可以减少网络拥堵,减少网络流量
# 消息重新入队
第一个消费者突然中断,此时消息自动去空闲的消费者那里消费,达到了消息不丢失的目的。