rabbitMQ实战巩固

miloyang
0 评论
/ /
770 阅读
/
16474 字
21 2023-10

如何持久化?如何可靠性?如何保证消息不丢失?本文将探讨。

对rbmq不甚了解的,请移步rbmq小白开局

由于rbmq是异步的,所以就要求可靠性,当然是没有完全100%的可靠性,我们要做的,是在99%后面再增加几个9而已。
所谓MQ的可靠性,就是消息应该至少被消费者处理1次。那么如何确保MQ的可靠性呢?发送失败了,或者宕机了,怎么处理呢?
一般来说,会有根据生产者、MQ服务、消费者三方面,有如下一些问题会导致消息的丢失:

  • 发送消息时丢失

    • 生产者发送消息时,链接MQ失败
    • 生产者发送消息到达MQ后未找到交换机
    • 生产者发送消息到达MQ的交换机后,未找到合适的队列
    • 消息到达MQ后,处理消息的进程发生异常
  • MQ导致消息丢失

    • 消息到达MQ、保存到队列后,还没消费就宕机了
  • 消费者处理消息异常

    • 消息接收后,还没处理就宕机
    • 消息接收后,处理过程中错误

所以,我们应该怎么办呢?客观,我知道你很急,但你先别急,接着看。

生产者

生产者是源头,它如果出问题那整个流程都进行不下去了,所以必须保证。

生产者连接MQ失败

发消息连接MQ的时候,出现了网络波动连接失败,所以我们在获取连接的时候,应当加入重试机制,多次重试。 如下代码:

func connectWithRetry() (*amqp.Connection, error) {
    var conn *amqp.Connection
    var err error
    reconnectAttempts := 0

    for {
            conn, err := amqp.Dial("amqp://global:123456@124.223.47.250:5672/global")
        if err == nil {
            log.Println("Connected to RabbitMQ")
            return conn, nil
        }

        reconnectAttempts++
        if reconnectAttempts > 5 {
            return nil, fmt.Errorf("Max reconnection attempts reached: %v", err)
        }

        log.Printf("Failed to connect to RabbitMQ: %v. Retrying in %v...", err, reconnectDelay)
        time.Sleep(2 * time.Second)
    }
}

在网络不稳的时候,可以重试有效提高成功率,但是重试是阻塞式的,如果对业务性能有要求,不要重试,直接根据连接后的err来做你的事情吧,比如增加通知、告警等等。也要合理配置等待时长和重试次数咯。

生产者的确认

连接顺畅了,一般不会容易出现丢失的情况,如果是没找到交换机,那么发送后的err一定不是nil,但是如果是交换机没找到合适的队列,比如队列被删除了,那err就是nil了,那生产者如何确定消息一定发送成功了呢?
虽然,rbmq给我们提供了一个 confirm 模式以确保消息的可靠传递,如:

err := channel.Confirm(false) // 关闭批量模式
if err != nil {
    // 处理错误
}

select {
case confirmed := <-channel.NotifyPublish(make(chan amqp.Confirmation, 1)):
    if confirmed.Ack {
        // 消息成功发布
    } else {
        // 消息发布失败
    }
}

每一个成功或者失败,都会有回调,一旦入了交换机就会。但是如果交换机如果找不到合适的queue,也是会消息发布成功的,此时可以检查队列中是否有消息积累。这种情况的产生,概率极其低,一般正常的情况下我们都遇不到。

// 获取队列消息数量
    messageCount, err := ch.QueueInspect(queueName)
    if err != nil {
        log.Fatalf("Failed to inspect the queue: %v", err)
    }

数据持久性

消息到达MQ以后,如果MQ不能及时保存,也会导致消息丢失,所以MQ的可靠性也非常重要。
为了提升性能,默认下MQ的数据都是内存存储的临时数据,重启后就会消息,所以如果注重数据安全性,必须配置数据持久化,其中数据持久化包括:

  • 交换机持久化
  • 队列持久化
  • 消息持久化

我们以go代码为例:

//声明创建一个fanout类型的交换机
    err = ch.ExchangeDeclare(
        "direct3", // exchange name
        "direct",  // exchange type
        false,     // 是否持久化交换机?true为持久化
        false,     // auto-deleted
        false,     // internal
        false,     // no-wait
        nil,       // arguments
    )

// 声明一个队列
 q, err := ch.QueueDeclare(
        "queue3", // name
        true,     // durable,是否持久化
        false,    // delete when unused
        false,    // exclusive
        false,    // no-wait
        nil,      // arguments
    )       
        
 // 发送消息的时候指定 
 amqp.Publishing{
            ContentType:  "text/plain",
            Body:         []byte("hello world"),
            DeliveryMode:amqp.Persistent, // 2表示持久化
        })
 
场景1 场景2 场景3 场景3
交换机 true true true false
队列 true false true true
消息 true true false true
结果 消息不丢失 消息丢失,消费者连接报错 消息丢失 消费者可以消费队列消息,生产者报错

为了消息的安全性,如果要持久化,建议全部开启持久化,否则开启交换机、队列持久化。

说明:在开启持久化机制以后,如果同时还开启了生产者确认,那么MQ会在消息持久化以后才发送ACK回执,进一步确保消息的可靠性。 不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化到数据库的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

LazyQueue

默认情况下,rbmq会将消息保存在内存之中以降低消息收发的延迟,但是如果说消费方性能较低,会造成一些消息的积压忙,或者消费方宕机了、阻塞了等等。
一旦出现消息堆积问题,rbmq的内存占用就会越来越高,直到触发内存预警上线。此时rbmq就会将内存的消息刷到磁盘中,这个行为成为pageout,会耗费一段时间,并且是阻塞队列进程,所以这个过程中rbmq不会再处理新的消息,生产者所有的请求都会被阻塞。
为了解决这个问题,在3.6之后,增加了lazy queue模式,惰性队列,它:

  • 接收到的消息直接存入磁盘而非内存。
  • 消费者要消费消息才会从磁盘中读取并加载到内存,也就是懒加载。
  • 支持数百万条的消息存储。 所以在rbmq的3.12版本之后,这个模式已经成为所有队列的默认格式了,所以官方都推荐升级到3.12版本,如果你实在怕升级出现的版本问题,那么就将你的队列设置为lazyqueue模式吧。如果你的版本还在3.6之前的版本,赶紧升级吧。

配置

  • 在控制台配置,直接在arguments中配置。

lazymoshidetupian

  • go代码
args := make(amqp.Table)
    args["x-queue-mode"] = "lazy"
    q, err := ch.QueueDeclare(
        "queue4", // name
        true,     // durable,是否持久化
        false,    // delete when unused
        false,    // exclusive
        false,    // no-wait
        args,     // arguments
    )

但是,在 RabbitMQ 中,一旦创建了队列,它的持久性和存储模式是不可更改的。因此,如果您已经创建了一个队列,并希望将其更改为 "lazy" 模式,您通常需要采取以下步骤:

  • 1:创建一个新的 "lazy" 模式队列。
  • 2:将现有队列中的消息重新发布到新的 "lazy" 模式队列。
  • 3:删除或停用原始队列。

持久化总结:

  • 首先,要让交换机、队列、消息都配置为持久化。
  • 使用lazyqueue模式进行消息持久化
  • 开启持久化和生产者确认时,rbmq只有在消息持久化完成后才会给生产者返回ACK回执。

消费者的可靠性

好了,我们说了生产者可靠性和MQ的可靠性,前面都可靠了,但是如果消费者不可靠,那前面都白忙活了。那如何要确认消费者的可靠性呢?
一般我们从以下几个方面入手:

消费者确认

当消费者处理消息结束后,应该想rbmq发送一个回执,告诉rbmq自己消息处理状态。状态包含三种:ack/nack/reject。
go代码如下:

go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            time.Sleep(1 * time.Second)

            // 手动确认消息处理成功消费
            //false的时候确认当前一条消息被成功消费,true的时候确认之前所有未确认的消息全部被确认消费
            if err := d.Ack(false); err != nil {
                log.Printf("Failed to acknowledge the message: %v", err)
            }

            // 确认消息处理失败,拒绝该条消息
            // 第一参数表示拒绝多条::true,从当前消息起拒绝所有未确认的消息,false:拒绝当前消息
            // 第二参数表示是否要重新排队:true表示继续排序,重新进入队列,false:消息丢弃
            /*if err := d.Nack(false, false); err != nil {
                log.Printf("Failed to nacknowledge the message: %v", err)
            }*/

            // 如果为true,该消息继续进入其他队列交付给其他的队列,false则直接删除
            /*if err := d.Reject(false); err != nil {
                log.Printf("Failed to rejectnowledge the message: %v", err)
            }*/
        }
    }()

至于ack和nack以及reject的具体含义以及使用,可看代码注释。

当然,我们在消费的时候,也可以设置是否为自动确认发送回执。 ,我们使用自动即可,不用整那么多七七八八的代码了。

// 监听固定队列的名称 q.Name
    msgs, err := ch.Consume(
        "queue4", // queue
        "",       // consumer
        true,     // auto-ack 为false表示手动确认,true表示自动确认。
        false,    // exclusive
        false,    // no-local
        false,    // no-wait
        nil,      // args
    )

限制消费者数目

限制消费者的数目,让每一个消费者都需要一定的计算和内存来处理消息,不会超过当前实例的承受范围,也避免了过载,多大能力穿多大裤衩,一个个事情来做,不要一下子全部把活揽过来,这样你做不完,别人又闲着。

通过设置 Qos 方法来限制每个消费者的消息处理数量。Qos 方法的第一个参数是 prefetch count,表示每个消费者可以一次处理的消息数量。通过设置 Qos,我们可以确保每个消费者一次只会处理指定数量的消息,以达到限制消费者数目的目的。

 // 设置 prefetch count,限制每个消费者一次处理的消息数量
    err = ch.Qos(numConsumers, 0, false)
    if err != nil {
        log.Fatalf("Failed to set QoS: %v", err)
    }

业务幂等

不单单只是这个场景咯,我们在做任何设计的时候,都需要考虑业务的幂等性,同一个业务,执行一次或者多次对业务状态的影响是一致的,比如我们根据一个id查询、删除数据、或者新增数据,不可能我这次查的和下一次查的是不一的数据。

在实际业务场景中,经常出现业务被重复执行的场景,我们MQ也是会重复投递的。那我们如何设计、处理业务的幂等呢? 一般处理的方法是:

  • 唯一消息ID

    这个思路非常简单,也是容易实现的,就是每一条消息生成唯一的id,一起给消费者,消费者收到后处理完自己的业务,把id存起来。每次处理消息之前,把id去库、缓存里面查一下看看是否处理过。

  • 业务状态判断

    根据当前业务,来判断状态,比如删除id,当时这个id都不存在,还删啥。

  • 乐观锁

    如果多个消费者可能同时处理相同的消息,使用乐观锁机制确保只有一个消费者能够成功处理。

主动查询

刚刚说了,mq无论如何都不可能百分百保证,只能后面多几个九,那有一些场景,比如电商中的支付,mq通知不一定发送交易服务上去,那交易服务一定在那干等着吗? 不一定吧。 你可以去主动查呀,开个定时任务,每隔多久时间(10s),就去查询一下,看看所有的订单支付状态,如果发现某个订单已经支付了,则立刻变更状态为已支付。但是要记住幂等哦。

延迟消息

指的是生产者发送消息指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息。比如新用户注册成功了,过一个小时,等你体验了这个软件了,再给你发送一封体验邮件反馈问卷之类的,或者是支付成功之后,半个小时内未付款需要取消订单等等。

之前我们的思路是什么?redis设置时间?定时去数据库里面轮询? 现在有更加方便的思路来了:
就是支付成功后或者注册成功后,我立马就往MQ中发送一条消息,然后等一定的时间,再往队列里面发,这样消费者完全不用关心这条消息的类型。该干嘛干嘛。

在RabbitMQ中实现延迟消息也有两种方案:

  • 死信交换机+TTL
  • 延迟消息插件

死信交换机

当一个队列中的消息满足下列情况之一时,就会成为死信:

  • 消费者使用了主动回复,并且使用了reject或者nack声明消费失败,且将requeue参数设置为false,也就是立马删除了。
  • 消息是一个过期消息,达到了队列或者消息本身设置了过期时间,超时了无人消费。
  • 投递的队列消息堆积满了,最早的消息可能成为死信。

如果队列通过dead-letter-exchange属性指定了一个交换机,那么这个队列中的死信就会投递到这个交换机中,这个交换机就是死信交换机。简称DLX。 而此时加入有队列与死信交换机绑定,则最终死信就会被投递到这个队列中。

如下图: sixinjiaohuanjijiagoutu

  • simple.queue和simple.direct交换机进行绑定了,但是并未绑定消费者。
  • 另外一组交换机为dlx.direct,队列为dlx.queue,绑定了一个consumer。
  • simple.queue设置了x-dead-letter-exchange为dlx.direct.
  • 此时生产一条消息,设置TTL为30s,发送到了simple.direct,自动转为simple.queue。
  • 等到30s后,还是没有人消费消息,则消息自动转到死信交换机dlx.direct上,那么也自动的由consumer来消费了。

死信交换机有什么作用呢?

  1. 收集那些因处理失败而被拒绝的消息
  2. 收集那些因队列满了而被拒绝的消息
  3. 收集因TTL(有效期)到期的消息

如下:go代码实现以上流程,并设置ttl消息。

  • 创建dlx.direect以及dlx.queue
// 创建死信交换机 dlx.direct
    err = ch.ExchangeDeclare(
        "dlx.direct", // exchange name
        "direct",     // exchange type
        false,        // 是否持久化交换机?true为持久化
        false,        // auto-deleted
        false,        // internal
        false,        // no-wait
        nil,          // arguments
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    dq, err := ch.QueueDeclare(
        "dlx.queue", // name
        true,        // durable,是否持久化
        false,       // delete when unused
        false,       // exclusive
        false,       // no-wait
        nil,         // arguments
    )

    err = ch.QueueBind(
        dq.Name,      // queue name
        "",           // routing key
        "dlx.direct", // exchange
        false,
        nil,
    )
    if err != nil {
        log.Fatal(err.Error())
    }
  • 创建simple.direct和simple.queu并指定死信交换机
//声明创建一个direct类型的交换机
    err = ch.ExchangeDeclare(
        "simple.direct", // exchange name
        "direct",        // exchange type
        false,           // 是否持久化交换机?true为持久化
        false,           // auto-deleted
        false,           // internal
        false,           // no-wait
        nil,             // arguments
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    args := make(amqp.Table)
    args["x-queue-mode"] = "lazy"
    args["x-dead-letter-exchange"] = "dlx.direct"
    sq, err := ch.QueueDeclare(
        "simple.queue", // name
        true,           // durable,是否持久化
        false,          // delete when unused
        false,          // exclusive
        false,          // no-wait
        args,           // arguments
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    err = ch.QueueBind(
        sq.Name,         // queue name
        "",              // routing key
        "simple.direct", // exchange
        false,
        nil,
    )
    if err != nil {
        log.Fatal(err.Error())
    }
  • 发送时指定ttl时间
amqp.Publishing{
            ContentType:  "text/plain",
            Body:         []byte("hello world"),
            DeliveryMode: amqp.Persistent,
            Expiration:   "10000", // 10s
        })

看上去挺麻烦的是吧? 是的,人家官方设计这个,主要是让你处理因处理失败而被拒绝的消息,根本就不是让你这么用的,你搁这卡bug呢?
但是,我们可以通过插件来实现。

DelayExchange插件

rabbitmq-delayed-message-exchange

安装插件步骤:

  • 下载插件releases ,根据rabbitmq版本来。
  • 我的rbmq是docker安装的,且使用了挂载目录,可以查看挂载目录:docker volume inspect mq-plugins
[root@VM-4-9-centos ~]# docker volume inspect mq-plugins
[
    {
        "CreatedAt": "2023-10-19T11:41:35+08:00",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
        "Name": "mq-plugins",
        "Options": null,
        "Scope": "local"
    }
]
  • 进入/var/lib/docker/volumes/mq-plugins/_data,上传刚刚下载的插件,并执行安装 docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
[root@VM-4-9-centos _data]# rz
rz waiting to receive.
Starting zmodem transfer.  Press Ctrl+C to cancel.
Transferring rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez...
  100%      49 KB      49 KB/sec    00:00:01       0 Errors  

[root@VM-4-9-centos _data]# docker exec -it rbmq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Enabling plugins on node rabbit@mq:
rabbitmq_delayed_message_exchange
The following plugins have been configured:
  rabbitmq_delayed_message_exchange
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_prometheus
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@mq...
The following plugins have been enabled:
  rabbitmq_delayed_message_exchange

started 1 plugins.
  • 以下为创建延迟队列交换机名称、队列名称且发送延迟消息的代码:
func main() {
    conn, err := amqp.Dial("amqp://global:123456@124.223.47.250:5672/global") // "amqp://username:password@host:port/virtual_host"
    defer conn.Close()
    if err != nil {
        log.Fatal(err.Error())
    }

    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err.Error())
    }

    // 创建延迟交换机
    err = ch.ExchangeDeclare(
        "delayed-exchange",  // 交换机名称
        "x-delayed-message", // 交换机类型
        true,
        false,
        false,
        false,
        map[string]interface{}{"x-delayed-type": "direct"}, // 参数,声明延迟队列类型
    )
    if err != nil {
        log.Fatalf("Failed to declare a delayed exchange: %v", err)
    }

    // 创建延迟队列
    _, err = ch.QueueDeclare(
        "delayed-queue", // 队列名称
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Failed to declare a delayed queue: %v", err)
    }

    // 将队列绑定到延迟交换机
    err = ch.QueueBind(
        "delayed-queue", // 队列名称
        "",
        "delayed-exchange", // 交换机名称
        false,
        nil,
    )
    if err != nil {
        log.Fatalf("Failed to bind the queue to the delayed exchange: %v", err)
    }

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    // 发送延迟消息
    messageBody := []byte("Delayed Message")
    delayTime := 5000 // 延迟时间(毫秒)
    headers := map[string]interface{}{"x-delay": delayTime}

    err = ch.PublishWithContext(ctx,
        "delayed-exchange", // 交换机名称
        "",
        false,
        false,
        amqp.Publishing{
            Headers:     headers,
            ContentType: "text/plain",
            Body:        messageBody,
        },
    )
    if err != nil {
        log.Fatal(err.Error())
    }

}

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。 因此,不建议设置延迟时间过长的延迟消息

人未眠
工作数十年
脚步未曾歇,学习未曾停
乍回首
路程虽丰富,知识未记录
   借此博客,与之共进步