rabbitMQ初识入门

miloyang
0 评论
/ /
666 阅读
/
29271 字
19 2023-10

如何异步通信,如何解耦系统组件,客官,你不来看看?

既然说异步,我们先来说同步,再引入异步。

背景

先看看我们目前的处理方式,有一个支付的服务,目前有三个功能,先减余额,再更新交易流水,最后更新订单状态。如下图:
yuque_mind.jpeg 这样看的话,是没有问题,代码从上往下,同步执行。

但是随着产品不断迭代,功能越来越多,在支付完成后需要短信通知用户、更新用户信息等等,不断的功能叠加进来。每个业务代码执行都需要一定的时间,就单单一个支付服务,累计时长不断增长,大家都在等着你支付完成,你卡着全部卡住了。
zhifutongbutupian.jpeg

这样肯定会造成问题:

  • 每次新增一个业务,是不是在支付服务里面不断改代码,也不符合开闭原则(对扩展开放,对修改关闭)
  • 如果扣款成功,但是更新积分失败,是不是整个业务都要回滚?级联失败。
  • 大并发的情况下,一个支付就占用了300ms,性能太差了。

所以,异步来了。

异步调用

还是刚刚那个支付服务,我们支付服务最主要的是什么?肯定是扣减余额和更新支付流水,这个都失败了还玩个锤子,至于其他的更新订单状态、积分、短信通知等等,都是可以解耦出去的。那么可以把这些,变为异步的,我只是通知他就可以了。

yibuzhifufuwujiagou.jpeg

主要是这个消息通知,如何定义?

消息通知

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者,投递消息的人,就是原来的调用方。
  • 消息Broker,管理、暂存、转发消息,可以理解为微信服务器。
  • 消息接收者,接收和处理消息的人,就是原来服务提供方。
再来个通俗易懂的,你收快递,原来是快递员送到你手上,你不在他就在那等着,这是同步。
现在是快递员把快递放到快递柜,然后发短信给你告诉你唯一识别码,你方便了再去取。
所以快递员和你就解耦了,他可以去干其他的活,你什么时候去取,取决于你。 
这里:消息发送者就是快递员,消息broker就是快递柜,你,就是消息接收者。 

所以,上述支付服务例子,一个支付服务耗时100ms,剩下的就是发送消息,具体什么时候执行,看各个业务方。明显有如下优势:

  • 耦合度更低,后续修改交易服务或者积分服务,不需要来支付服务修改代码。
  • 性能更好,节省了大量时间
  • 业务拓展好,后续再增加其他服务,比如支付后可以抽奖服务等等,直接监听消息就好了。
  • 故障隔离,如果更新积分失败,不会影响核心的支付服务。

当然,异步也有其他的问题:

  • 完全依赖于broker的可靠性、安全性和性能,比如快递柜丢了,你的快递也丢了,或者发短信你没收到,你快递也丢了。
  • 不能立即得到调用结果,时效性差,比如查询业务,就不能使用了。
  • 架构复杂,后续维护和调试麻烦。

异步中间件选型

消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ. 常见有如下几种:

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社区 Rabbit(社区活跃) Apache 阿里 Apache
开发语言 Erlang(面向并发) Java Java Scala&Java
协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
可用性(集群) 一般
单机吞吐量 一般(s/十万) 非常高(s/百万)
消息延迟(接收速度) 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性(消息确认) 一般 一般

据统计,目前国内消息队列使用较多的是RabbitMQ,再加上其他各方面都比较均横,所以,我们学习 RabbitMQ。 kafka在日志系统中使用多点。

RabbitMQ

rabbitMQ :logo是一个兔子,动如脱兔,跑的很快。

安装

通过docker来。

  • 拉取镜像: docker pull rabbitmq:3.8-management
  • 运行镜像
 docker run \
 -e RABBITMQ_DEFAULT_USER=milo \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 -v mq-plugins:/plugins \
 --name rbmq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3.8-management

其中可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口
  • 5672:RabbitMQ的消息发送处理接口 安装完成。
    开放防火墙后,可以直接访问15672端口来看。 用户名和密码就是刚刚设置的milo和1233456
    mqkeshihuajiemian

一脸懵逼没关系,我们这个步骤,只是安装而已。看到这个界面,说明安装成功。已经学习了大半了。

基本介绍

我们先看一个架构图

rbmqjiagoutushiyijia

  • publisher:生产者,也就是发送消息的一方。
  • consumer:消费者者,消息接收的一方,它和队列(queue)绑定
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchage:交换机,负责路由信息,生产者发送的消息由交换机决定投递到哪个队列。可以路由一个队列,也可以路由所有队列。没有存储消息的能力。

一句话: 生产者发送消息给交换机,交换机发送给队列,消费者从队列里面拿到消息。

交换机和队列,就是一套rbmq的服务。那有一个问题了,公司中很多业务,不可能每个业务都搭建一个mq吧,各个业务共享一个mq的话,如何做隔离呢?
类似于数据库和redis的database,rbmq也有隔离机制,叫做virtualHost,虚拟主机。一个mq有不同的virtualHost,交换机和队列存在自己所属的virtualHost之中。所以就相互隔离了。
理论上是需要给每一个项目创建一个用户,每一个用户绑定一个virturalHost,在virturalHost中,绑定交换机和队列之间的关系。

通过可视化页面配置

需求:我们通过可视化管理工具 rabbitMQ management,来配置一个账号,创建virtualHost,创建queue,并且绑定在exchange中,然后往在exchange中发送消息,在queue中查看。

  • 在Admin tabs的Add a user label中,填写对应的用户信息。点击Add user后,然后点击右上角的 log out登出,通过新账号登录。

    rbmqchuangjainyonghu

  • 新账号登录上来后,在Admin tabs的右侧,点击virtual hosts,然后在add a new virtual host中填写信息,并 add。

    rbmqchuangjianvh

  • 在queues tab中,点击 add a new queue,目前只是选择 virtual host,以及数据queue的name后,点击 add queue即可

    rbmqchuangjianqueueusk
    此时,queue有了,exchanges也有了(有默认的),但是queue未绑定上exchages。

  • 在exchanges中,绑定队列即可。

    rbmqbangdingqueueus

  • 测试信息,发送消息到exchanges中,exchanges自动发送到绑定的queue中。 rbmqfasongxiaoxi

    在队列中的g_queue_1中,有1条消息ready。

rbmqfasongxiaxoiak

快速入门

通过官方的方式来把。RabbitMQ Tutorials
快速入门,是生产者直接发送到queue中,消费者直接从queue中获取消息,没有交换机。

rbmqhelloworlddemo

最近的语言栈是Go,所以就以Go来吧。

  • 先获取package
    go get github.com/rabbitmq/amqp091-go
  • public的代码
    publish.go
func main() {
    conn, err := amqp.Dial("amqp://global:123456@124.223.47.250:5672/global") // "amqp://username:password@host:port/virtual_host"
    defer conn.Close()
    if err != nil {
        log.Fatal(err.Error())
    }

    ch, err := conn.Channel()
    if err != nil {
        log.Fatal(err.Error())
    }

    q, err := ch.QueueDeclare(
        "g_queue_1", // queue name
        true,        // durable
        false,       // delete when unused
        false,       // exclusive
        false,       // no-wait
        nil,         // arguments
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    payload := "hello,i am go client"

    err = ch.PublishWithContext(ctx,
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(payload),
        })
    if err != nil {
        log.Fatal(err.Error())
    }
    log.Printf(" [x] Sent %s\n", payload)
}

运行之后,再次打开可视化工具,可以发现在queue: g_queue_1中的ready变为了2.

  • 我们来消费
    consume.go
func main() {
    conn, err := amqp.Dial("amqp://global:123456@124.223.47.250:5672/global")
    defer conn.Close()
    if err != nil {
        log.Fatal(err.Error())
    }

    ch, err := conn.Channel()
    defer ch.Close()
    if err != nil {
        log.Fatal(err.Error())
    }

    q, err := ch.QueueDeclare(
        "g_queue_1", // name
        true,        // durable
        false,       // delete when unused
        false,       // exclusive
        false,       // no-wait
        nil,         // arguments
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    var forever chan struct{}

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    <-forever
}
2023/10/19 17:06:27 Received a message: hello,i am go client

好了,快速入门完成。继续升级。
当队列中的消息越来越多,已经产生消息累积了,单个消费者能力有限,那如何解决呢?

WorkQueues模型

如上问题,work queue,任务模型,简单来说就是让多个消费者绑定到一个队列上面去,共同消费队列中的消息,来减少消息累积workqueueduijiedemo

  • 我们改下上面publish.go中的发送代码,让他发送10条消息到queue中。
    for i := 0; i < 10; i++ {
          payload := fmt.Sprintf("hello,i am message:%d", i)
          err = ch.PublishWithContext(ctx,
              "",     // exchange
              q.Name, // routing key
              false,  // mandatory
              false,  // immediate
              amqp.Publishing{
                  ContentType: "text/plain",
                  Body:        []byte(payload),
              })
          if err != nil {
              log.Fatal(err.Error())
          }
      }
    
  • 消费端不用动,直接起两个刚刚的consume.go即可,我们会发现:
PS E:\workspaces\goland\study_demo\rbmq_demo\quick_demo\consume> go run .\consume.go
2023/10/19 17:47:57 Received a message: hello,i am message:1
2023/10/19 17:47:57 Received a message: hello,i am message:3 
2023/10/19 17:47:57 Received a message: hello,i am message:5 
2023/10/19 17:47:57 Received a message: hello,i am message:7 
2023/10/19 17:47:57 Received a message: hello,i am message:9 

2023/10/19 17:47:57 Received a message: hello,i am message:0
2023/10/19 17:47:57 Received a message: hello,i am message:2 
2023/10/19 17:47:57 Received a message: hello,i am message:4 
2023/10/19 17:47:57 Received a message: hello,i am message:6 
2023/10/19 17:47:57 Received a message: hello,i am message:8 

这两个消费者,不会重复消费,你一个我一个的。消息是平均分配给每个消费者。
这样显然是有问题的。,因为发现的是你一个我一个,A的服务器性能好,可能一下子就消费完5个了,但是B的服务性能差点,可能要过一会才消费完5个。当然,可以修改代码:
则需要修改消费的代码:只需把自动确认修改为手动确认,即可。

msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack //禁止自动确认,改为手动确认
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
        
        go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            time.Sleep(1 * time.Second)
            d.Ack(false) // 增加确认,也就是处理完了
        }
    }()

以上都是简单测试,都没有使用到交换机,甚至都谈不上使用了异步。

交换机

之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:

jiaohuanjijiagoutus
可以看到,在订阅模型里面,多了一个exchanges角色,而且过程也变了。

  • publisher:生成者,不是直接发消息到queue了,而是发到exchanges。
  • exchange:交换机,一方面接收生产者发送的消息,另一方面,处理消息,比如把消息递交给某个特别的队列、所有队列、消息丢弃等等,如何操作取决于交换机的类型。
  • queue:消息队列和之前一样,接收消息和缓存消息,但是队列一定要和交换机绑定了。
  • consumer:消费者,和之前一样,订阅队列,没有变化。

刚刚说了下,交换机多种类型,一般有以下:

  • fanout:广播,将消息交给所有绑定到交换机的队列,也就是使用可视化操作的时候用的那个交换机。
  • Direct:订阅,基于routingKey,路由,发送给订阅了的消息队列。
  • Topic:通配符订阅,于direct类似,只不过routingKey可以使用通配符。
  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

所以交换机的作用是:

  • 接收publisher发送的消息
  • 将消息按照规则路由到之前绑定的队列

Fanout交换机

它会将发到这个exchange的消息广播到关注此exchange的所有接收端上。

fanoutranbmqdasdf

也就是客户端向接收queue1,就绑定在queue1上面,想接收queue2,就绑定在queue2上面。直接来个案例吧。

实际工作中,交换机和队列是作为资源,由运维管理员创建和绑定的。,刚刚已经演示如何通过可视化工具创建了,现在通过消费者的代码来创建、绑定交换机和队列。
案例:

  • 消费者1创建交换机为demo.fanout和队列fanout.queue1,并且绑定和监听。
  • 消费者2创建交换机为demo.fanout和队列fanout.queue2,并且绑定和监听。重复创建交换机不会报错。
  • 生产者向demo.fanout中发送消息。
  • 查看消费者1和消费者2是否都收到相同消息。

整体代码:

  • 生产者,之前的代码不用动,只是修改下,原本向queue中发送,现在发到exchanges中。
    err = ch.PublishWithContext(ctx,
            "demo.fanout", // exchange
            "",            // routing key
            false,         // mandatory
            false,         // immediate
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(payload),
            })
  • consumer1,创建exchanges:demo.fanout、创建queue:fanout.queue1,并且绑定。
func main() {
    conn, err := amqp.Dial("amqp://global:123456@124.223.47.250:5672/global")
    defer conn.Close()
    if err != nil {
        log.Fatal(err.Error())
    }

    ch, err := conn.Channel()
    defer ch.Close()
    if err != nil {
        log.Fatal(err.Error())
    }

    //声明创建一个fanout类型,名称为demo.fanout的交换机。
    err = ch.ExchangeDeclare(
        "demo.fanout", // exchange name
        "fanout",      // exchange type
        true,          // durable
        false,         // auto-deleted
        false,         // internal
        false,         // no-wait
        nil,           // arguments
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    //创建一个名称为fanout.queue1的队列
    q, err := ch.QueueDeclare(
        "fanout.queue1", // name
        true,            // durable
        false,           // delete when unused
        false,           // exclusive
        false,           // no-wait
        nil,             // arguments
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    // 并且将队列绑定到demo.fanout的交换机上面
    err = ch.QueueBind(
        q.Name,        // queue name
        "",            // routing key
        "demo.fanout", // exchange
        false,
        nil,
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    // 监听固定队列的名称 q.Name
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    var forever chan struct{}

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            time.Sleep(1 * time.Second)
        }
    }()

    <-forever
}
  • consumer2,创建exchanges:demo.fanout、创建queue:fanout.queue2,并且绑定。代码如上,只是把fanout.queue1改成2即可。

一定要先起来consumer1和consumer2,再生产消息。

PS E:\workspaces\goland\study_demo\rbmq_demo\fanout\consume1> go run .\consume1.go
2023/10/19 22:28:06 Received a message: hello,i am message:0
2023/10/19 22:28:07 Received a message: hello,i am message:1
2023/10/19 22:28:08 Received a message: hello,i am message:2
2023/10/19 22:28:09 Received a message: hello,i am message:3
2023/10/19 22:28:10 Received a message: hello,i am message:4

PS E:\workspaces\goland\study_demo\rbmq_demo\fanout\consume2> go run .\consume2.go
2023/10/19 22:28:06 Received a message: hello,i am message:0
2023/10/19 22:28:07 Received a message: hello,i am message:1
2023/10/19 22:28:08 Received a message: hello,i am message:2
2023/10/19 22:28:09 Received a message: hello,i am message:3
2023/10/19 22:28:10 Received a message: hello,i am message:4

这样看上去,所有consumer都收到了消息。完成了我们的需求。
但是在实际的开发中,肯定是不同的人收到不同的消息,比如支付成功的消息,支付相关业务才关心,跟聊天业务没有关系,那聊天业务就不需要收到这个消息了。
所以,就需要用到Direct Exchange了

Direct交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的 queue,因此称为 定向 路由。
direct-exchangeshlitu

如上图:

  • 交换机和queue绑定,就需要指定一个routingKey,路由key了,如orange给了Q1,black、green给了Q2。
  • 生产者向Exchange发送消息的时候,必须要指定routingKey。
  • Exchange不再把消息交给每一个绑定个队列,而是根据消息的routingKey进行判断,只有队列的和消息的routingKey完全一致,才会接收到消息。
  • 两个队列可以绑定相同的key,这样也可以实现Fanout交换机功能。

来个案例

  • consumer1创建交换机为demo.direct和队列direct.queue1,并且绑定key为red、orange和监听。
  • consumer2创建交换机为demo.direct和队列direct.queue2,并且绑定key为red、green和监听。
  • 生产者向demo.direct中发送消息,并指定key为、red、orange、green。
  • 查看消费者1和消费者2是否都收到指定key的消息。

  • 生产者,之前的代码不用动,只是修改下,发到exchanges中增加相关key。
for i := 0; i < 10; i++ {
        re := i % 3
        rKey := ""
        if re == 0 {
            rKey = "red"
        } else if re == 1 {
            rKey = "orange"
        } else {
            rKey = "green"
        }
        payload := fmt.Sprintf("hello,i am message key:%s", rKey)

        // 往这个交换机上面发信息
        err = ch.PublishWithContext(ctx,
            "demo.direct", // exchange
            rKey,          // routing key
            false,         // mandatory
            false,         // immediate
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(payload),
            })
        if err != nil {
            log.Fatal(err.Error())
        }
    }
  • consumer1,创建exchanges:demo.direct、创建queue:fanout.queue1,并且绑定key为red、orange。
func main() {
    conn, err := amqp.Dial("amqp://global:123456@124.223.47.250:5672/global")
    defer conn.Close()
    if err != nil {
        log.Fatal(err.Error())
    }

    ch, err := conn.Channel()
    defer ch.Close()
    if err != nil {
        log.Fatal(err.Error())
    }

    //声明创建一个fanout类型,名称为demo.fanout的交换机。
    err = ch.ExchangeDeclare(
        "demo.direct", // exchange name
        "direct",      // exchange type
        true,          // durable
        false,         // auto-deleted
        false,         // internal
        false,         // no-wait
        nil,           // arguments
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    //创建一个名称为fanout.queue1的队列
    q, err := ch.QueueDeclare(
        "direct.queue1", // name
        //"direct.queue2", // name
        true,  // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        nil,   // arguments
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    routingKes := []string{"red", "orange"}
    //routingKes := []string{"red", "green"}

    for _, rKey := range routingKes {
        // 并且将队列绑定到demo.direct的交换机上面,指定key
        err = ch.QueueBind(
            q.Name,        // queue name
            rKey,          // routing key
            "demo.direct", // exchange
            false,
            nil,
        )
        if err != nil {
            log.Fatal(err.Error())
        }
    }

    // 监听固定队列的名称 q.Name
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    var forever chan struct{}

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            time.Sleep(1 * time.Second)
        }
    }()

    <-forever
}
  • consumer2,创建exchanges:demo.direct、创建queue:fanout.queue2,并且绑定key为red、green,修改下consumer1即可。

先起来consumer1和consumer2,再生产消息。

PS E:\workspaces\goland\study_demo\rbmq_demo\direct\consume1> go run .\consume1.go
2023/10/20 09:18:03 Received a message: hello,i am message key:red
2023/10/20 09:18:04 Received a message: hello,i am message key:orange
2023/10/20 09:18:05 Received a message: hello,i am message key:red
2023/10/20 09:18:06 Received a message: hello,i am message key:orange
2023/10/20 09:18:07 Received a message: hello,i am message key:red
2023/10/20 09:18:08 Received a message: hello,i am message key:orange
2023/10/20 09:18:09 Received a message: hello,i am message key:red

PS E:\workspaces\goland\study_demo\rbmq_demo\direct\consume2> go run .\consume1.go
2023/10/20 09:17:18 Received a message: hello,i am message key:red
2023/10/20 09:17:19 Received a message: hello,i am message key:green
2023/10/20 09:17:20 Received a message: hello,i am message key:red
2023/10/20 09:17:21 Received a message: hello,i am message key:green
2023/10/20 09:17:22 Received a message: hello,i am message key:red
2023/10/20 09:17:23 Received a message: hello,i am message key:green
2023/10/20 09:17:24 Received a message: hello,i am message key:red

由上,我们可以根据指定的key来获取消息,但是在实际工作中,一般是根据业务来的,比如支付业务,跟支付业务相关所有的消息,订单都需要关心,那总不能像上面例子一个个全部绑定吧。这样不是很灵活了。所以topic交换机来了。

topic交换机

和kafka中的topic不一样又有点一样。
topic类型和direct相比,差不多,都是可以根据key把消息路由到不同的队列,区别就在于对key的不同处理。topic可以让key使用通配符。
bindingkey一般有一个或者多个单词组成,多个单词之间以.分割,比如:service.pay,service.pay.wx,service.order,service.order.detail.deliver等等。
通配符规则:

  • #:匹配一个或多个词,比如service.pay.#,可以匹配:service.pay,,service.pay.wx,
  • *:匹配一个词。比如service.*,可以匹配:service.pay,service.order

来个例子:

  • consumer1创建交换机为demo.topic和队列topic.pay.queue1,并且绑定key为service.pay.#和监听。
  • consumer2创建交换机为demo.topic和队列topic.order.queue1,并且绑定key为service.order.#和监听。
  • consumer3创建交换机为demo.topic和队列topic.service.queue1,并且绑定key为service.*和监听。
  • 生产者向demo.topic中发送消息,并指定key为、service.pay,service.pay.wx,service.order,service.order.detail.deliver。
  • 查看消费者1/2/3是否都收到指定key的消息。

  • 生产者代码,修改为发送的相关:
for i := 0; i < 10; i++ {
        re := i % 4
        rKey := ""
        //,,,
        if re == 0 {
            rKey = "service.pay"
        } else if re == 1 {
            rKey = "service.pay.wx"
        } else if re == 2 {
            rKey = "service.order"
        } else {
            rKey = "service.order.detail.deliver"
        }
        payload := fmt.Sprintf("hello,i am message key:%s", rKey)
        // 往这个交换机上面发信息
        err = ch.PublishWithContext(ctx,
            "demo.topic", // exchange
            rKey,         // routing key
            false,        // mandatory
            false,        // immediate
            amqp.Publishing{
                ContentType: "text/plain",
                Body:        []byte(payload),
            })
        if err != nil {
            log.Fatal(err.Error())
        }
  • 生产者,通过修改代码起三个consumer,把注释分别打开即可。
func main() {
    conn, err := amqp.Dial("amqp://global:123456@124.223.47.250:5672/global")
    defer conn.Close()
    if err != nil {
        log.Fatal(err.Error())
    }

    ch, err := conn.Channel()
    defer ch.Close()
    if err != nil {
        log.Fatal(err.Error())
    }

    //声明创建一个fanout类型,名称为demo.fanout的交换机。
    err = ch.ExchangeDeclare(
        "demo.topic", // exchange name
        "topic",      // exchange type
        true,         // durable
        false,        // auto-deleted
        false,        // internal
        false,        // no-wait
        nil,          // arguments
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    //创建一个名称为fanout.queue1的队列
    q, err := ch.QueueDeclare(
        "topic.pay.queue1", // name
        //"topic.order.queue1", // name
        //"topic.service.queue1", // name
        true,  // durable
        false, // delete when unused
        false, // exclusive
        false, // no-wait
        nil,   // arguments
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    rKey := "service.pay.#"
    //rKey := "service.order.#"
    //rKey := "service.*"
    // 并且将队列绑定到demo.direct的交换机上面,指定key
    err = ch.QueueBind(
        q.Name,       // queue name
        rKey,         // routing key
        "demo.topic", // exchange
        false,
        nil,
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    // 监听固定队列的名称 q.Name
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    if err != nil {
        log.Fatal(err.Error())
    }

    var forever chan struct{}

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            time.Sleep(1 * time.Second)
        }
    }()

    <-forever
}

写在最后

在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

这里推荐使用第三方的库,包括java中也有的,叫做 amqp.

go get github.com/streadway/amqp
运维帮我们在服务器上创建用户、指定管理员和访问权限。下面针对rbbitmq进行的封装:

// 声明队列类型
type RabbitMQ struct {
    channel  *amqp.Channel
    Name     string
    exchange string
}

// 连接服务器
func Connect(s string) *RabbitMQ {
    //连接rabbitmq
    conn, e := amqp.Dial(s)
    failOnError(e, "连接Rabbitmq服务器失败!")
    ch, e := conn.Channel()
    failOnError(e, "无法打开频道!")
    mq := new(RabbitMQ)
    mq.channel = ch
    return mq
}

// 初始化单个消息队列
// 第一个参数:rabbitmq服务器的链接,第二个参数:队列名字
func New(s string, name string) *RabbitMQ {
    //连接rabbitmq
    conn, e := amqp.Dial(s)
    failOnError(e, "连接Rabbitmq服务器失败!")
    ch, e := conn.Channel()
    failOnError(e, "无法打开频道!")
    q, e := ch.QueueDeclare(
        name,  //队列名
        false, //是否开启持久化
        true,  //不使用时删除
        false, //排他
        false, //不等待
        nil,   //参数
    )
    failOnError(e, "初始化队列失败!")

    mq := new(RabbitMQ)
    mq.channel = ch
    mq.Name = q.Name
    return mq
}

//批量初始化消息队列
//第一个参数:rabbitmq服务器的链接,第二个参数:队列名字列表

// 声明交换机
func (q *RabbitMQ) QueueDeclare(queue string) {
    _, e := q.channel.QueueDeclare(queue, false, true, false, false, nil)
    failOnError(e, "声明交换机!")
}

// 删除交换机
func (q *RabbitMQ) QueueDelete(queue string) {
    _, e := q.channel.QueueDelete(queue, false, true, false)
    failOnError(e, "删除队列失败!")
}

// 配置队列参数
func (q *RabbitMQ) Qos() {
    e := q.channel.Qos(1, 0, false)
    failOnError(e, "无法设置QoS")
}

//配置交换机参数

// 初始化交换机
// 第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
func NewExchange(s string, name string, typename string) {
    //连接rabbitmq
    conn, e := amqp.Dial(s)
    failOnError(e, "连接Rabbitmq服务器失败!")
    ch, e := conn.Channel()
    failOnError(e, "无法打开频道!")
    e = ch.ExchangeDeclare(
        name,     // name
        typename, // type
        true,     // durable
        false,    // auto-deleted
        false,    // internal
        false,    // no-wait
        nil,      // arguments
    )
    failOnError(e, "初始化交换机失败!")

}

// 删除交换机
func (q *RabbitMQ) ExchangeDelete(exchange string) {
    e := q.channel.ExchangeDelete(exchange, false, true)
    failOnError(e, "绑定队列失败!")
}

// 绑定消息队列到哪个exchange
func (q *RabbitMQ) Bind(exchange string, key string) {
    e := q.channel.QueueBind(
        q.Name,
        key,
        exchange,
        false,
        nil,
    )
    failOnError(e, "绑定队列失败!")
    q.exchange = exchange
}

// 向消息队列发送消息
// Send方法可以往某个消息队列发送消息
func (q *RabbitMQ) Send(queue string, body interface{}) {
    str, e := json.Marshal(body)
    failOnError(e, "消息序列化失败!")
    e = q.channel.Publish(
        "",    //交换
        queue, //路由键
        false, //必填
        false, //立即
        amqp.Publishing{
            ReplyTo: q.Name,
            Body:    []byte(str),
        })
    msg := "向队列:" + q.Name + "发送消息失败!"
    failOnError(e, msg)
}

// 向exchange发送消息
// Publish方法可以往某个exchange发送消息
func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) {
    str, e := json.Marshal(body)
    failOnError(e, "消息序列化失败!")
    e = q.channel.Publish(
        exchange,
        key,
        false,
        false,
        amqp.Publishing{ReplyTo: q.Name,
            Body: []byte(str)},
    )
    failOnError(e, "向路由发送消息失败!")
}

// 接收某个消息队列的消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
    c, e := q.channel.Consume(
        q.Name, //指定从哪个队列中接收消息
        "",
        true,
        false,
        false,
        false,
        nil,
    )
    failOnError(e, "接收消息失败!")
    return c
}

// 关闭队列连接
func (q *RabbitMQ) Close() {
    q.channel.Close()
}

// 错误处理函数
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}
人未眠
工作数十年
脚步未曾歇,学习未曾停
乍回首
路程虽丰富,知识未记录
   借此博客,与之共进步