如何持久化?如何可靠性?如何保证消息不丢失?本文将探讨。
对rbmq不甚了解的,请移步rbmq小白开局
由于rbmq是异步的,所以就要求可靠性,当然是没有完全100%的可靠性,我们要做的,是在99%后面再增加几个9而已。
所谓MQ的可靠性,就是消息应该至少被消费者处理1次。那么如何确保MQ的可靠性呢?发送失败了,或者宕机了,怎么处理呢?
一般来说,会有根据生产者、MQ服务、消费者三方面,有如下一些问题会导致消息的丢失:
发送消息时丢失
- 生产者发送消息时,链接MQ失败
- 生产者发送消息到达MQ后未找到交换机
- 生产者发送消息到达MQ的交换机后,未找到合适的队列
- 消息到达MQ后,处理消息的进程发生异常
MQ导致消息丢失
- 消息到达MQ、保存到队列后,还没消费就宕机了
消费者处理消息异常
- 消息接收后,还没处理就宕机
- 消息接收后,处理过程中错误
所以,我们应该怎么办呢?客观,我知道你很急,但你先别急,接着看。
生产者
生产者是源头,它如果出问题那整个流程都进行不下去了,所以必须保证。
生产者连接MQ失败
发消息连接MQ的时候,出现了网络波动连接失败,所以我们在获取连接的时候,应当加入重试机制,多次重试。 如下代码:
func connectWithRetry() (*amqp.Connection, error) {
var conn *amqp.Connection
var err error
reconnectAttempts := 0
for {
conn, err := amqp.Dial("amqp://global:123456@124.223.47.250:5672/global")
if err == nil {
log.Println("Connected to RabbitMQ")
return conn, nil
}
reconnectAttempts++
if reconnectAttempts > 5 {
return nil, fmt.Errorf("Max reconnection attempts reached: %v", err)
}
log.Printf("Failed to connect to RabbitMQ: %v. Retrying in %v...", err, reconnectDelay)
time.Sleep(2 * time.Second)
}
}
在网络不稳的时候,可以重试有效提高成功率,但是重试是阻塞式的,如果对业务性能有要求,不要重试,直接根据连接后的err来做你的事情吧,比如增加通知、告警等等。也要合理配置等待时长和重试次数咯。
生产者的确认
连接顺畅了,一般不会容易出现丢失的情况,如果是没找到交换机,那么发送后的err一定不是nil,但是如果是交换机没找到合适的队列,比如队列被删除了,那err就是nil了,那生产者如何确定消息一定发送成功了呢?
虽然,rbmq给我们提供了一个 confirm 模式以确保消息的可靠传递,如:
err := channel.Confirm(false) // 关闭批量模式
if err != nil {
// 处理错误
}
select {
case confirmed := <-channel.NotifyPublish(make(chan amqp.Confirmation, 1)):
if confirmed.Ack {
// 消息成功发布
} else {
// 消息发布失败
}
}
每一个成功或者失败,都会有回调,一旦入了交换机就会。但是如果交换机如果找不到合适的queue,也是会消息发布成功的,此时可以检查队列中是否有消息积累。这种情况的产生,概率极其低,一般正常的情况下我们都遇不到。
// 获取队列消息数量
messageCount, err := ch.QueueInspect(queueName)
if err != nil {
log.Fatalf("Failed to inspect the queue: %v", err)
}
数据持久性
消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。
为了提升性能,默认下MQ的数据都是内存存储的临时数据,重启后就会消息,所以如果注重数据安全性,必须配置数据持久化,其中数据持久化包括:
- 交换机持久化
- 队列持久化
- 消息持久化
我们以go代码为例:
//声明创建一个fanout类型的交换机
err = ch.ExchangeDeclare(
"direct3", // exchange name
"direct", // exchange type
false, // 是否持久化交换机?true为持久化
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
// 声明一个队列
q, err := ch.QueueDeclare(
"queue3", // name
true, // durable,是否持久化
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
// 发送消息的时候指定
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("hello world"),
DeliveryMode:amqp.Persistent, // 2表示持久化
})
场景1 | 场景2 | 场景3 | 场景3 | |
---|---|---|---|---|
交换机 | true | true | true | false |
队列 | true | false | true | true |
消息 | true | true | false | true |
结果 | 消息不丢失 | 消息丢失,消费者连接报错 | 消息丢失 | 消费者可以消费队列消息,生产者报错 |
为了消息的安全性,如果要持久化,建议全部开启持久化,否则开启交换机、队列持久化。
说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。 不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。
LazyQueue
默认情况下,rbmq会将消息保存在内存之中以降低消息收发的延迟,但是如果说消费方性能较低,会造成一些消息的积压忙,或者消费方宕机了、阻塞了等等。
一旦出现消息堆积问题,rbmq的内存占用就会越来越高,直到触发内存预警上线。此时rbmq就会将内存的消息刷到磁盘中,这个行为成为pageout,会耗费一段时间,并且是阻塞队列进程,所以这个过程中rbmq不会再处理新的消息,生产者所有的请求都会被阻塞。
为了解决这个问题,在3.6之后,增加了lazy queue模式,惰性队列,它:
- 接收到的消息直接存入磁盘而非内存。
- 消费者要消费消息才会从磁盘中读取并加载到内存,也就是懒加载。
- 支持数百万条的消息存储。 所以在rbmq的3.12版本之后,这个模式已经成为所有队列的默认格式了,所以官方都推荐升级到3.12版本,如果你实在怕升级出现的版本问题,那么就将你的队列设置为lazyqueue模式吧。如果你的版本还在3.6之前的版本,赶紧升级吧。
配置
- 在控制台配置,直接在arguments中配置。
- go代码
args := make(amqp.Table)
args["x-queue-mode"] = "lazy"
q, err := ch.QueueDeclare(
"queue4", // name
true, // durable,是否持久化
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
但是,在 RabbitMQ 中,一旦创建了队列,它的持久性和存储模式是不可更改的。因此,如果您已经创建了一个队列,并希望将其更改为 "lazy" 模式,您通常需要采取以下步骤:
- 1:创建一个新的 "lazy" 模式队列。
- 2:将现有队列中的消息重新发布到新的 "lazy" 模式队列。
- 3:删除或停用原始队列。
持久化总结:
- 首先,要让交换机、队列、消息都配置为持久化。
- 使用lazyqueue模式进行消息持久化
- 开启持久化和生产者确认时,rbmq只有在消息持久化完成后才会给生产者返回ACK回执。
消费者的可靠性
好了,我们说了生产者可靠性和MQ的可靠性,前面都可靠了,但是如果消费者不可靠,那前面都白忙活了。那如何要确认消费者的可靠性呢?
一般我们从以下几个方面入手:
消费者确认
当消费者处理消息结束后,应该想rbmq发送一个回执,告诉rbmq自己消息处理状态。状态包含三种:ack/nack/reject。
go代码如下:
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
time.Sleep(1 * time.Second)
// 手动确认消息处理成功消费
//false的时候确认当前一条消息被成功消费,true的时候确认之前所有未确认的消息全部被确认消费
if err := d.Ack(false); err != nil {
log.Printf("Failed to acknowledge the message: %v", err)
}
// 确认消息处理失败,拒绝该条消息
// 第一参数表示拒绝多条::true,从当前消息起拒绝所有未确认的消息,false:拒绝当前消息
// 第二参数表示是否要重新排队:true表示继续排序,重新进入队列,false:消息丢弃
/*if err := d.Nack(false, false); err != nil {
log.Printf("Failed to nacknowledge the message: %v", err)
}*/
// 如果为true,该消息继续进入其他队列交付给其他的队列,false则直接删除
/*if err := d.Reject(false); err != nil {
log.Printf("Failed to rejectnowledge the message: %v", err)
}*/
}
}()
至于ack和nack以及reject的具体含义以及使用,可看代码注释。
当然,我们在消费的时候,也可以设置是否为自动确认发送回执。 ,我们使用自动即可,不用整那么多七七八八的代码了。
// 监听固定队列的名称 q.Name
msgs, err := ch.Consume(
"queue4", // queue
"", // consumer
true, // auto-ack 为false表示手动确认,true表示自动确认。
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
限制消费者数目
限制消费者的数目,让每一个消费者都需要一定的计算和内存来处理消息,不会超过当前实例的承受范围,也避免了过载,多大能力穿多大裤衩,一个个事情来做,不要一下子全部把活揽过来,这样你做不完,别人又闲着。
通过设置 Qos 方法来限制每个消费者的消息处理数量。Qos 方法的第一个参数是 prefetch count,表示每个消费者可以一次处理的消息数量。通过设置 Qos,我们可以确保每个消费者一次只会处理指定数量的消息,以达到限制消费者数目的目的。
// 设置 prefetch count,限制每个消费者一次处理的消息数量
err = ch.Qos(numConsumers, 0, false)
if err != nil {
log.Fatalf("Failed to set QoS: %v", err)
}
业务幂等
不单单只是这个场景咯,我们在做任何设计的时候,都需要考虑业务的幂等性,同一个业务,执行一次或者多次对业务状态的影响是一致的,比如我们根据一个id查询、删除数据、或者新增数据,不可能我这次查的和下一次查的是不一的数据。
在实际业务场景中,经常出现业务被重复执行的场景,我们MQ也是会重复投递的。那我们如何设计、处理业务的幂等呢? 一般处理的方法是:
唯一消息ID
这个思路非常简单,也是容易实现的,就是每一条消息生成唯一的id,一起给消费者,消费者收到后处理完自己的业务,把id存起来。每次处理消息之前,把id去库、缓存里面查一下看看是否处理过。
业务状态判断
根据当前业务,来判断状态,比如删除id,当时这个id都不存在,还删啥。
乐观锁
如果多个消费者可能同时处理相同的消息,使用乐观锁机制确保只有一个消费者能够成功处理。
主动查询
刚刚说了,mq无论如何都不可能百分百保证,只能后面多几个九,那有一些场景,比如电商中的支付,mq通知不一定发送交易服务上去,那交易服务一定在那干等着吗? 不一定吧。 你可以去主动查呀,开个定时任务,每隔多久时间(10s),就去查询一下,看看所有的订单支付状态,如果发现某个订单已经支付了,则立刻变更状态为已支付。但是要记住幂等哦。
延迟消息
指的是生产者发送消息指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。比如新用户注册成功了,过一个小时,等你体验了这个软件了,再给你发送一封体验邮件反馈问卷之类的,或者是支付成功之后,半个小时内未付款需要取消订单等等。
之前我们的思路是什么?redis设置时间?定时去数据库里面轮询? 现在有更加方便的思路来了:
就是支付成功后或者注册成功后,我立马就往MQ中发送一条消息,然后等一定的时间,再往队列里面发,这样消费者完全不用关心这条消息的类型。该干嘛干嘛。
在RabbitMQ中实现延迟消息也有两种方案:
- 死信交换机+TTL
- 延迟消息插件
死信交换机
当一个队列中的消息满足下列情况之一时,就会成为死信:
- 消费者使用了主动回复,并且使用了reject或者nack声明消费失败,且将requeue参数设置为false,也就是立马删除了。
- 消息是一个过期消息,达到了队列或者消息本身设置了过期时间,超时了无人消费。
- 投递的队列消息堆积满了,最早的消息可能成为死信。
如果队列通过dead-letter-exchange属性指定了一个交换机,那么这个队列中的死信就会投递到这个交换机中,这个交换机就是死信交换机。简称DLX。 而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。
如下图:
- simple.queue和simple.direct交换机进行绑定了,但是并未绑定消费者。
- 另外一组交换机为dlx.direct,队列为dlx.queue,绑定了一个consumer。
- simple.queue设置了x-dead-letter-exchange为dlx.direct.
- 此时生产一条消息,设置TTL为30s,发送到了simple.direct,自动转为simple.queue。
- 等到30s后,还是没有人消费消息,则消息自动转到死信交换机dlx.direct上,那么也自动的由consumer来消费了。
死信交换机有什么作用呢?
- 收集那些因处理失败而被拒绝的消息
- 收集那些因队列满了而被拒绝的消息
- 收集因TTL(有效期)到期的消息
如下:go代码实现以上流程,并设置ttl消息。
- 创建dlx.direect以及dlx.queue
// 创建死信交换机 dlx.direct
err = ch.ExchangeDeclare(
"dlx.direct", // exchange name
"direct", // exchange type
false, // 是否持久化交换机?true为持久化
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err.Error())
}
dq, err := ch.QueueDeclare(
"dlx.queue", // name
true, // durable,是否持久化
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
err = ch.QueueBind(
dq.Name, // queue name
"", // routing key
"dlx.direct", // exchange
false,
nil,
)
if err != nil {
log.Fatal(err.Error())
}
- 创建simple.direct和simple.queu并指定死信交换机
//声明创建一个direct类型的交换机
err = ch.ExchangeDeclare(
"simple.direct", // exchange name
"direct", // exchange type
false, // 是否持久化交换机?true为持久化
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err.Error())
}
args := make(amqp.Table)
args["x-queue-mode"] = "lazy"
args["x-dead-letter-exchange"] = "dlx.direct"
sq, err := ch.QueueDeclare(
"simple.queue", // name
true, // durable,是否持久化
false, // delete when unused
false, // exclusive
false, // no-wait
args, // arguments
)
if err != nil {
log.Fatal(err.Error())
}
err = ch.QueueBind(
sq.Name, // queue name
"", // routing key
"simple.direct", // exchange
false,
nil,
)
if err != nil {
log.Fatal(err.Error())
}
- 发送时指定ttl时间
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("hello world"),
DeliveryMode: amqp.Persistent,
Expiration: "10000", // 10s
})
看上去挺麻烦的是吧? 是的,人家官方设计这个,主要是让你处理因处理失败而被拒绝的消息,根本就不是让你这么用的,你搁这卡bug呢?
但是,我们可以通过插件来实现。
DelayExchange插件
rabbitmq-delayed-message-exchange
安装插件步骤:
- 下载插件releases ,根据rabbitmq版本来。
- 我的rbmq是docker安装的,且使用了挂载目录,可以查看挂载目录:
docker volume inspect mq-plugins
[root@VM-4-9-centos ~]# docker volume inspect mq-plugins
[
{
"CreatedAt": "2023-10-19T11:41:35+08:00",
"Driver": "local",
"Labels": null,
"Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
"Name": "mq-plugins",
"Options": null,
"Scope": "local"
}
]
- 进入/var/lib/docker/volumes/mq-plugins/_data,上传刚刚下载的插件,并执行安装
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
[root@VM-4-9-centos _data]# rz
rz waiting to receive.
Starting zmodem transfer. Press Ctrl+C to cancel.
Transferring rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez...
100% 49 KB 49 KB/sec 00:00:01 0 Errors
[root@VM-4-9-centos _data]# docker exec -it rbmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@mq:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
rabbitmq_delayed_message_exchange
rabbitmq_management
rabbitmq_management_agent
rabbitmq_prometheus
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@mq...
The following plugins have been enabled:
rabbitmq_delayed_message_exchange
started 1 plugins.
- 以下为创建延迟队列交换机名称、队列名称且发送延迟消息的代码:
func main() {
conn, err := amqp.Dial("amqp://global:123456@124.223.47.250:5672/global") // "amqp://username:password@host:port/virtual_host"
defer conn.Close()
if err != nil {
log.Fatal(err.Error())
}
ch, err := conn.Channel()
if err != nil {
log.Fatal(err.Error())
}
// 创建延迟交换机
err = ch.ExchangeDeclare(
"delayed-exchange", // 交换机名称
"x-delayed-message", // 交换机类型
true,
false,
false,
false,
map[string]interface{}{"x-delayed-type": "direct"}, // 参数,声明延迟队列类型
)
if err != nil {
log.Fatalf("Failed to declare a delayed exchange: %v", err)
}
// 创建延迟队列
_, err = ch.QueueDeclare(
"delayed-queue", // 队列名称
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatalf("Failed to declare a delayed queue: %v", err)
}
// 将队列绑定到延迟交换机
err = ch.QueueBind(
"delayed-queue", // 队列名称
"",
"delayed-exchange", // 交换机名称
false,
nil,
)
if err != nil {
log.Fatalf("Failed to bind the queue to the delayed exchange: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 发送延迟消息
messageBody := []byte("Delayed Message")
delayTime := 5000 // 延迟时间(毫秒)
headers := map[string]interface{}{"x-delay": delayTime}
err = ch.PublishWithContext(ctx,
"delayed-exchange", // 交换机名称
"",
false,
false,
amqp.Publishing{
Headers: headers,
ContentType: "text/plain",
Body: messageBody,
},
)
if err != nil {
log.Fatal(err.Error())
}
}
延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。 因此,不建议设置延迟时间过长的延迟消息。