RabbitMQ
四大核心概念:
生产者:产生数据发送消息的程序
交换机:交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定 。
队列:队列是RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 。
消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
RabbitMQ中的名词介绍:
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类 似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务 时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接, 如果应用程序支持多线程,通常每个thread创建单独的channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
Exchange:交换机,message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发 消息到queue中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据
文件配置
rabbitmq默认的配置文件位于/etc/rabbitmq/rabbitmq.conf,可以通过环境变量覆盖默认的配置文件路径
#环境变量实现
export RABBITMQ_CONFIG_FILE=/custom/path/rabbitmq
#不加扩展名
#实际读取的是 /custom/path/rabbitmq.conf
#修改rabbitmq-env.conf变量配置文件实现
vim /etc/rabbitmq/rabbitmq-env.conf
NODENAME=rabbit@rabbitmq-server1 #节点名称(必须唯一)
CONFIG_FILE=/custom/path/my-rabbitmq.conf #配置文件路径
MNESIA_BASE=/data/rabbitmq/mnesia #数据存储目录
LOG_BASE=/data/logs/rabbitmq #日志存储路径RabbitMQ 3.7.0+ 开始使用 INI-like 格式(Key = Value),也支持旧版本的 Erlang 格式(advanced.config),但推荐使用新版格式。新版本有些配置项无法配置,只能通过旧版本配置进行配置,两个配置文件同时存在也不会出错,会合并生效。rabbitmq.conf配置优先级高于advanced.config
配置实例
# 查看RabbitMQ当前配置项
rabbitmqctl environment
rabbitmq-diagnostics environment
# 查看某项配置
rabbitmqctl environment | grep channel_max
vim /etc/rabbitmq/rabbitmq.conf
#网络与监听设置
# RabbitMQ监听端口(默认5672,AMQP协议)
listeners.tcp.default = 5672
# 绑定的IP地址(默认监听所有 0.0.0.0)
listeners.tcp.1 = 0.0.0.0:5672
# 启用TLS/SSL
listeners.ssl.default = 5671
ssl_options.cacertfile = /etc/rabbitmq/ca_certificate.pem
ssl_options.certfile = /etc/rabbitmq/server_certificate.pem
ssl_options.keyfile = /etc/rabbitmq/server_key.pem
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
#认证和权限
# 默认用户(管理用)
default_user = guest
default_pass = guest
# 允许远程访问 guest 用户(默认false)
loopback_users.guest = false
#消息队列参数(调整性能)
# 单连接最大并发channel数
channel_max = 2048
# 单消息最大字节数(默认128MB)
frame_max = 131072
# 心跳检测时间(秒)
heartbeat = 60
# 每个队列最大长度(条数)
# 注意:这通常通过策略(policy)而非配置文件设置
#持久化与存储
# 数据存储路径
# 默认:/var/lib/rabbitmq/mnesia/
# 使用磁盘节点时重要
node_data_dir = /var/lib/rabbitmq/mnesia/rabbit@hostname
#日志配置
# 日志文件路径
log.dir = /var/log/rabbitmq
log.file = /var/log/rabbitmq/rabbit.log
# 日志级别(debug, info, warning, error)
log.level = info
#插件配置
# 启用management插件相关配置
management.tcp.port = 15672
management.load_definitions = /etc/rabbitmq/definitions.json日志管理
syslog
#日志服务端配置
vim /etc/rsyslog.conf
module(load="imtcp") # needs to be done just once
input(type="imtcp" port="514")
#定义模板,按来源 IP 分目录存储
$template RemoteLogTemplate, "/var/log/remote/%FROMHOST-IP%/syslog.log"
#所有非本机日志套用该模板
:fromhost-ip, !isequal, "127.0.0.1" ?RemoteLogTemplate
#重启使其生效
systemctl restart rsyslog.service
如果不做模板,定义IP分目录存储,那么客户端和本地的日志都会混合存储在/var/log/messages中
#TCP/UDP
vim /etc/rabbitmq/rabbitmq.conf
log.syslog = true #启用syslog输出
log.syslog.transport = tcp #使用TCP协议进行传输,或者UDP(UDP速度更快,TCP安全)
log.syslog.protocol = rfc5424 #使用 RFC 5424 标准的 Syslog 协议格式(可以不写)
log.syslog.host = my.syslog-server.local #远程日志服务器地址
log.syslog.port = 1514 #远程日志服务器监听端口
#TLS加密
log.syslog = true
log.syslog.transport = tls
log.syslog.protocol = rfc5424
log.syslog.host = my.syslog-server.local
log.syslog.port = 1514
log.syslog.ssl_options.cacertfile = /path/to/ca_certificate.pem
log.syslog.ssl_options.certfile = /path/to/client_certificate.pem
log.syslog.ssl_options.keyfile = /path/to/client_key.pem
# 只发送 warning 及以上级别的日志到 syslog
log.syslog.level = warning
# 格式化为 JSON 便于日志平台处理(如 ELK)
log.syslog.formatter = json
# 自定义在 syslog 中的标识
log.syslog.identity = my_rabbitmq
log.syslog.facility = user
#rabbitmq配置
vim /etc/rabbitmq/rabbitmq.conf
log.syslog = true
log.syslog.transport = tcp
log.syslog.host = 192.168.135.40
log.syslog.port = 514
#重启生效
systemctl restar rabbitmq-server.servicelogrotate
RabbitMQ 官方推荐使用系统的 logrotate 工具进行日志切割。这是最稳定、最通用的方式。
vim /etc/logrotate.d/rabbitmq-server #配置参数不需要记住,用到的时候去查就行了
/var/log/rabbitmq/*.log {
weekly
missingok
rotate 8
compress
delaycompress
notifempty
create 640 rabbitmq adm
postrotate
/usr/bin/rabbitmqctl reopen_logs > /dev/null
endscript
}
#如果使用syslog进行日志管理,就得在syslog服务端对日志进行logrotate轮转管理MQ内置
从 RabbitMQ 3.8 开始,支持内置的日志切割功能,无需依赖 logrotate。
vim /etc/rabbitmq/rabbitmq.conf
# 启用基于大小的切割
log.file.rotation.size = 10485760 # 10MB
# 启用基于时间的切割(每天)
log.file.rotation.date = $D0
# 保留文件数量
log.file.rotation.count = 10
#示例
# 方案 A:仅按文件大小轮转 (当日志达到10MB时切割,保留5个历史文件)
log.file.rotation.size = 10485760
log.file.rotation.count = 5
# 方案 B:同时使用大小和日期轮转 (接近10MB或到每天零点都会切割,保留7个历史文件)
log.file.rotation.size = 10485760
log.file.rotation.date = $D0
log.file.rotation.count = 7仲裁队列
生产仲裁队列的代码
#生产者,仲裁队列
[root@rabbitmq-server1 ~]# vim quorum-Producer.java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.Map;
class Producer {
// 消息队列名称
private final static String QUEUE_NAME = "kkk";
public void send() throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.135.100");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("zgs666");
factory.setVirtualHost("/admin");
// 创建连接和通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
Map<String, Object> queueArgs = new HashMap<>();
queueArgs.put("x-queue-type", "quorum"); // 设置队列类型为仲裁队列
// 声明队列(持久化 + 仲裁队列)
channel.queueDeclare(QUEUE_NAME, true, false, false, queueArgs);
// 发送10000条消息
for (int i = 0; i < 10000; i++) {
String message = "Hello World RabbitMQ count: " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
}
}
public static void main(String[] args) {
try {
new Producer().send();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
#消费者,仲裁队列
[root@rabbitmq-server1 ~]# vim quorum-Consumer.java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.HashMap;
import java.util.Map;
class Consumer {
// 消息队列名称(与生产者保持一致)
private final static String QUEUE_NAME = "kkk";
public void receive() throws IOException, TimeoutException {
// 创建连接工厂(配置与生产者一致)
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.135.132");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("/admin");
// 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 设置仲裁队列参数(与生产者一致)
Map<String, Object> queueArgs = new HashMap<>();
queueArgs.put("x-queue-type", "quorum"); // 设置队列类型为仲裁队列
// 声明队列(持久化 + 仲裁队列)
channel.queueDeclare(QUEUE_NAME, true, false, false, queueArgs);
System.out.println(" [*] 等待仲裁队列消息。退出请按 CTRL+C");
// 创建消息处理回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] 收到: '" + message + "'");
};
// 开始消费消息(自动确认模式)
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
public static void main(String[] args) {
try {
new Consumer().receive();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
#使用方式
tar -xf jdk-8u171-linux-x64.tar.gz -C /usr/local/
vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_171
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=$JAVA_HOME/jre/lib/ext:$JAVA_HOME/lib/tools.jar
#下载依赖到当前目录
wget https://repo1.maven.org/maven2/com/rabbitmq/amqp-client/5.16.0/amqp-client-5.16.0.jar
wget https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.36/slf4j-api-1.7.36.jar
编译并运行
#编译
javac -cp .:amqp-client-5.16.0.jar:slf4j-api-1.7.36.jar quorum-Producer.java
#运行(生产消息)
java -cp .:amqp-client-5.16.0.jar:slf4j-api-1.7.36.jar quorum-Producer
#消费消息也一样,编译消费者代码就可以了主节点故障模拟
模拟集群仲裁队列主节点意外故障,重启失败的案例
通过命令查看队列成员信息,确定哪个节点是主节点,也可以在web页面查看
从节点故障模拟
模拟集群仲裁队列从节点意外故障,重启失败的案例
模拟故障
#将其中一个从节点关闭
[root@rabbitmq-server1 ~]# systemctl stop rabbitmq-server.service
#然后重启报错
[root@rabbitmq-server1 ~]# systemctl restart rabbitmq-server.service
Job for rabbitmq-server.service failed because the control process exited with error code.
See "systemctl status rabbitmq-server.service" and "journalctl -xeu rabbitmq-server.service" for details.
#下面时报错信息
[root@rabbitmq-server1 rabbit@rabbitmq-server1]# journalctl -xeu rabbitmq-server.service
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: server_recovery_strategy =>
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: {rabbit_quorum_queue,system_recover,[]},
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: wal_compute_checksums => true,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: wal_min_bin_vheap_size => 2971008,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: wal_data_dir =>
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: "/var/lib/rabbitmq/mnesia/rabbit@rabbitmq-server1/quorum/rabbit@rabbitmq-server1",
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: segment_max_pending => 1024,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: wal_sync_method => datasync,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: wal_garbage_collect => false,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: wal_pre_allocate => false,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: wal_min_heap_size => 233,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: server_min_heap_size => 233,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: default_max_pipeline_count => 4096,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: receive_snapshot_timeout => 30000,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: low_priority_commands_flush_size => 16,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: low_priority_commands_in_memory_size => 16,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: machine_upgrade_strategy => all}]},
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: permanent,false,infinity,supervisor,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: [ra_system_sup]}}}
5月 11 16:47:12 rabbitmq-server1 rabbitmq-server[57955]: syslog_logger_h - failed to process log event #{meta => #{error_logger => #{tag => info_report,type => std_info,report_cb => fun application_controller:>
5月 11 16:47:12 rabbitmq-server1 rabbitmq-server[57955]: syslog_logger_h - removing handler syslog_logger_h from logger
5月 11 16:47:13 rabbitmq-server1 rabbitmq-server[57955]: Kernel pid terminated (application_controller) ("{application_start_failure,rabbit,{{{shutdown,{failed_to_start_child,ra_log_sup,{shutdown,{failed_to_st>
5月 11 16:47:13 rabbitmq-server1 rabbitmq-server[57955]:
5月 11 16:47:13 rabbitmq-server1 rabbitmq-server[57955]: Crash dump is being written to: /var/log/rabbitmq/erl_crash.dump...done
5月 11 16:47:13 rabbitmq-server1 systemd[1]: rabbitmq-server.service: Main process exited, code=exited, status=1/FAILURE
░░ Subject: Unit process exited
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░
░░ An ExecStart= process belonging to unit rabbitmq-server.service has exited.
░░
░░ The process' exit code is 'exited' and its exit status is 1.
5月 11 16:47:13 rabbitmq-server1 systemd[1]: rabbitmq-server.service: Failed with result 'exit-code'.
░░ Subject: Unit failed
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░
░░ The unit rabbitmq-server.service has entered the 'failed' state with result 'exit-code'.
5月 11 16:47:13 rabbitmq-server1 systemd[1]: Failed to start Open source RabbitMQ server.
░░ Subject: rabbitmq-server.service 单元已失败
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░
░░ rabbitmq-server.service 单元已失败。
░░
░░ 结果为“failed”。
5月 11 16:47:13 rabbitmq-server1 systemd[1]: rabbitmq-server.service: Consumed 10.121s CPU time.
░░ Subject: Resources consumed by unit runtime
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░
░░ The unit rabbitmq-server.service completed and consumed the indicated resources.
...skipping...
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: server_recovery_strategy =>
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: {rabbit_quorum_queue,system_recover,[]},
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: wal_compute_checksums => true,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: wal_min_bin_vheap_size => 2971008,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: wal_data_dir =>
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: "/var/lib/rabbitmq/mnesia/rabbit@rabbitmq-server1/quorum/rabbit@rabbitmq-server1",
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: segment_max_pending => 1024,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: wal_sync_method => datasync,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: wal_garbage_collect => false,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: wal_pre_allocate => false,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: wal_min_heap_size => 233,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: server_min_heap_size => 233,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: default_max_pipeline_count => 4096,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: receive_snapshot_timeout => 30000,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: low_priority_commands_flush_size => 16,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: low_priority_commands_in_memory_size => 16,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: machine_upgrade_strategy => all}]},
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: permanent,false,infinity,supervisor,
5月 11 16:47:11 rabbitmq-server1 rabbitmq-server[57955]: [ra_system_sup]}}}
5月 11 16:47:12 rabbitmq-server1 rabbitmq-server[57955]: syslog_logger_h - failed to process log event #{meta => #{error_logger => #{tag => info_report,type => std_info,report_cb => fun application_controller:>
5月 11 16:47:12 rabbitmq-server1 rabbitmq-server[57955]: syslog_logger_h - removing handler syslog_logger_h from logger
5月 11 16:47:13 rabbitmq-server1 rabbitmq-server[57955]: Kernel pid terminated (application_controller) ("{application_start_failure,rabbit,{{{shutdown,{failed_to_start_child,ra_log_sup,{shutdown,{failed_to_st>
5月 11 16:47:13 rabbitmq-server1 rabbitmq-server[57955]:
5月 11 16:47:13 rabbitmq-server1 rabbitmq-server[57955]: Crash dump is being written to: /var/log/rabbitmq/erl_crash.dump...done
5月 11 16:47:13 rabbitmq-server1 systemd[1]: rabbitmq-server.service: Main process exited, code=exited, status=1/FAILURE
░░ Subject: Unit process exited
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░
░░ An ExecStart= process belonging to unit rabbitmq-server.service has exited.
░░
░░ The process' exit code is 'exited' and its exit status is 1.
5月 11 16:47:13 rabbitmq-server1 systemd[1]: rabbitmq-server.service: Failed with result 'exit-code'.
░░ Subject: Unit failed
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░
░░ The unit rabbitmq-server.service has entered the 'failed' state with result 'exit-code'.
5月 11 16:47:13 rabbitmq-server1 systemd[1]: Failed to start Open source RabbitMQ server.
░░ Subject: rabbitmq-server.service 单元已失败
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░
░░ rabbitmq-server.service 单元已失败。
░░
░░ 结果为“failed”。
5月 11 16:47:13 rabbitmq-server1 systemd[1]: rabbitmq-server.service: Consumed 10.121s CPU time.
░░ Subject: Resources consumed by unit runtime
░░ Defined-By: systemd
░░ Support: https://wiki.rockylinux.org/rocky/support
░░
░░ The unit rabbitmq-server.service completed and consumed the indicated resources.
#解决方法
#在健康的节点上将故障节点踢出集群
rabbitmqctl forget_cluster_node rabbit@rabbitmq-server1
#清理server1上的数据目录
rm -rf /var/lib/rabbitmq/mnesia/rabbit@rabbitmq-server1
#重新启动server1服务
systemctl start rabbitmq-server
#将server1重新加入集群
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@rabbitmq-server2
rabbitmqctl start_app
#验证集群状态和队列成员
rabbitmqctl cluster_status
rabbitmq-queues -p /admin quorum_status kkk #此时server1不会加入,需要手动加入
#将server1重新加回仲裁队列的成员列表(在建康节点执行)
rabbitmq-queues add_member -p /admin kkk rabbit@rabbitmq-server1
#等待一两分钟,Raft协议会自动向server1同步全量数据
rabbitmq-queues -p /admin quorum_status kkk