如何异步通信,如何解耦系统组件,客官,你不来看看?
既然说异步,我们先来说同步,再引入异步。
背景
先看看我们目前的处理方式,有一个支付的服务,目前有三个功能,先减余额,再更新交易流水,最后更新订单状态。如下图:
这样看的话,是没有问题,代码从上往下,同步执行。
但是随着产品不断迭代,功能越来越多,在支付完成后需要短信通知用户、更新用户信息等等,不断的功能叠加进来。每个业务代码执行都需要一定的时间,就单单一个支付服务,累计时长不断增长,大家都在等着你支付完成,你卡着全部卡住了。
这样肯定会造成问题:
- 每次新增一个业务,是不是在支付服务里面不断改代码,也不符合开闭原则(对扩展开放,对修改关闭)
- 如果扣款成功,但是更新积分失败,是不是整个业务都要回滚?级联失败。
- 大并发的情况下,一个支付就占用了300ms,性能太差了。
所以,异步来了。
异步调用
还是刚刚那个支付服务,我们支付服务最主要的是什么?肯定是扣减余额和更新支付流水,这个都失败了还玩个锤子,至于其他的更新订单状态、积分、短信通知等等,都是可以解耦出去的。那么可以把这些,变为异步的,我只是通知他就可以了。
主要是这个消息通知,如何定义?
消息通知
异步调用方式其实就是基于消息通知的方式,一般包含三个角色:
- 消息发送者,投递消息的人,就是原来的调用方。
- 消息Broker,管理、暂存、转发消息,可以理解为微信服务器。
- 消息接收者,接收和处理消息的人,就是原来服务提供方。
再来个通俗易懂的,你收快递,原来是快递员送到你手上,你不在他就在那等着,这是同步。
现在是快递员把快递放到快递柜,然后发短信给你告诉你唯一识别码,你方便了再去取。
所以快递员和你就解耦了,他可以去干其他的活,你什么时候去取,取决于你。
这里:消息发送者就是快递员,消息broker就是快递柜,你,就是消息接收者。
所以,上述支付服务例子,一个支付服务耗时100ms,剩下的就是发送消息,具体什么时候执行,看各个业务方。明显有如下优势:
- 耦合度更低,后续修改交易服务或者积分服务,不需要来支付服务修改代码。
- 性能更好,节省了大量时间
- 业务拓展好,后续再增加其他服务,比如支付后可以抽奖服务等等,直接监听消息就好了。
- 故障隔离,如果更新积分失败,不会影响核心的支付服务。
当然,异步也有其他的问题:
- 完全依赖于broker的可靠性、安全性和性能,比如快递柜丢了,你的快递也丢了,或者发短信你没收到,你快递也丢了。
- 不能立即得到调用结果,时效性差,比如查询业务,就不能使用了。
- 架构复杂,后续维护和调试麻烦。
异步中间件选型
消息Broker,目前常见的实现方案就是消息队列(MessageQueue),简称为MQ. 常见有如下几种:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit(社区活跃) | Apache | 阿里 | Apache |
开发语言 | Erlang(面向并发) | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性(集群) | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般(s/十万) | 差 | 高 | 非常高(s/百万) |
消息延迟(接收速度) | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性(消息确认) | 高 | 一般 | 高 | 一般 |
据统计,目前国内消息队列使用较多的是RabbitMQ,再加上其他各方面都比较均横,所以,我们学习 RabbitMQ。 kafka在日志系统中使用多点。
RabbitMQ
rabbitMQ :logo是一个兔子,动如脱兔,跑的很快。
安装
通过docker来。
- 拉取镜像:
docker pull rabbitmq:3.8-management
- 运行镜像
docker run \
-e RABBITMQ_DEFAULT_USER=milo \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name rbmq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
其中可以看到在安装命令中有两个映射的端口:
- 15672:RabbitMQ提供的管理控制台的端口
- 5672:RabbitMQ的消息发送处理接口
安装完成。
开放防火墙后,可以直接访问15672端口来看。 用户名和密码就是刚刚设置的milo和1233456
一脸懵逼没关系,我们这个步骤,只是安装而已。看到这个界面,说明安装成功。已经学习了大半了。
基本介绍
我们先看一个架构图
- publisher:生产者,也就是发送消息的一方。
- consumer:消费者者,消息接收的一方,它和队列(queue)绑定
- queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
- exchage:交换机,负责路由信息,生产者发送的消息由交换机决定投递到哪个队列。可以路由一个队列,也可以路由所有队列。没有存储消息的能力。
一句话: 生产者发送消息给交换机,交换机发送给队列,消费者从队列里面拿到消息。
交换机和队列,就是一套rbmq的服务。那有一个问题了,公司中很多业务,不可能每个业务都搭建一个mq吧,各个业务共享一个mq的话,如何做隔离呢?
类似于数据库和redis的database,rbmq也有隔离机制,叫做virtualHost,虚拟主机。一个mq有不同的virtualHost,交换机和队列存在自己所属的virtualHost之中。所以就相互隔离了。理论上是需要给每一个项目创建一个用户,每一个用户绑定一个virturalHost,在virturalHost中,绑定交换机和队列之间的关系。
通过可视化页面配置
需求:我们通过可视化管理工具 rabbitMQ management,来配置一个账号,创建virtualHost,创建queue,并且绑定在exchange中,然后往在exchange中发送消息,在queue中查看。
在Admin tabs的Add a user label中,填写对应的用户信息。点击Add user后,然后点击右上角的 log out登出,通过新账号登录。
新账号登录上来后,在Admin tabs的右侧,点击virtual hosts,然后在add a new virtual host中填写信息,并 add。
在queues tab中,点击 add a new queue,目前只是选择 virtual host,以及数据queue的name后,点击 add queue即可
此时,queue有了,exchanges也有了(有默认的),但是queue未绑定上exchages。在exchanges中,绑定队列即可。
测试信息,发送消息到exchanges中,exchanges自动发送到绑定的queue中。
在队列中的g_queue_1中,有1条消息ready。
快速入门
通过官方的方式来把。RabbitMQ Tutorials
快速入门,是生产者直接发送到queue中,消费者直接从queue中获取消息,没有交换机。
最近的语言栈是Go,所以就以Go来吧。
- 先获取package
go get github.com/rabbitmq/amqp091-go
- public的代码
publish.go
func main() {
conn, err := amqp.Dial("amqp://global:123456@124.223.47.250:5672/global") // "amqp://username:password@host:port/virtual_host"
defer conn.Close()
if err != nil {
log.Fatal(err.Error())
}
ch, err := conn.Channel()
if err != nil {
log.Fatal(err.Error())
}
q, err := ch.QueueDeclare(
"g_queue_1", // queue name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err.Error())
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
payload := "hello,i am go client"
err = ch.PublishWithContext(ctx,
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(payload),
})
if err != nil {
log.Fatal(err.Error())
}
log.Printf(" [x] Sent %s\n", payload)
}
运行之后,再次打开可视化工具,可以发现在queue: g_queue_1中的ready变为了2.
- 我们来消费
consume.go
func main() {
conn, err := amqp.Dial("amqp://global:123456@124.223.47.250:5672/global")
defer conn.Close()
if err != nil {
log.Fatal(err.Error())
}
ch, err := conn.Channel()
defer ch.Close()
if err != nil {
log.Fatal(err.Error())
}
q, err := ch.QueueDeclare(
"g_queue_1", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err.Error())
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatal(err.Error())
}
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
<-forever
}
2023/10/19 17:06:27 Received a message: hello,i am go client
好了,快速入门完成。继续升级。
当队列中的消息越来越多,已经产生消息累积了,单个消费者能力有限,那如何解决呢?
WorkQueues模型
如上问题,work queue,任务模型,简单来说就是让多个消费者绑定到一个队列上面去,共同消费队列中的消息,来减少消息累积
。
- 我们改下上面publish.go中的发送代码,让他发送10条消息到queue中。
for i := 0; i < 10; i++ { payload := fmt.Sprintf("hello,i am message:%d", i) err = ch.PublishWithContext(ctx, "", // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: "text/plain", Body: []byte(payload), }) if err != nil { log.Fatal(err.Error()) } }
- 消费端不用动,直接起两个刚刚的consume.go即可,我们会发现:
PS E:\workspaces\goland\study_demo\rbmq_demo\quick_demo\consume> go run .\consume.go
2023/10/19 17:47:57 Received a message: hello,i am message:1
2023/10/19 17:47:57 Received a message: hello,i am message:3
2023/10/19 17:47:57 Received a message: hello,i am message:5
2023/10/19 17:47:57 Received a message: hello,i am message:7
2023/10/19 17:47:57 Received a message: hello,i am message:9
2023/10/19 17:47:57 Received a message: hello,i am message:0
2023/10/19 17:47:57 Received a message: hello,i am message:2
2023/10/19 17:47:57 Received a message: hello,i am message:4
2023/10/19 17:47:57 Received a message: hello,i am message:6
2023/10/19 17:47:57 Received a message: hello,i am message:8
这两个消费者,不会重复消费,你一个我一个的。消息是平均分配给每个消费者。
这样显然是有问题的。,因为发现的是你一个我一个,A的服务器性能好,可能一下子就消费完5个了,但是B的服务性能差点,可能要过一会才消费完5个。当然,可以修改代码:
则需要修改消费的代码:只需把自动确认修改为手动确认,即可。
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack //禁止自动确认,改为手动确认
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
time.Sleep(1 * time.Second)
d.Ack(false) // 增加确认,也就是处理完了
}
}()
以上都是简单测试,都没有使用到交换机,甚至都谈不上使用了异步。
交换机
之前的两个测试案例中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:
可以看到,在订阅模型里面,多了一个exchanges角色,而且过程也变了。
- publisher:生成者,不是直接发消息到queue了,而是发到exchanges。
- exchange:交换机,一方面接收生产者发送的消息,另一方面,处理消息,比如把消息递交给某个特别的队列、所有队列、消息丢弃等等,如何操作取决于交换机的类型。
- queue:消息队列和之前一样,接收消息和缓存消息,但是队列一定要和交换机绑定了。
- consumer:消费者,和之前一样,订阅队列,没有变化。
刚刚说了下,交换机多种类型,一般有以下:
- fanout:广播,将消息交给所有绑定到交换机的队列,也就是使用可视化操作的时候用的那个交换机。
- Direct:订阅,基于routingKey,路由,发送给订阅了的消息队列。
- Topic:通配符订阅,于direct类似,只不过routingKey可以使用通配符。
- Headers:头匹配,基于MQ的消息头匹配,用的较少。
所以交换机的作用是:
- 接收publisher发送的消息
- 将消息按照规则路由到之前绑定的队列
Fanout交换机
它会将发到这个exchange的消息广播到关注此exchange的所有接收端上。
也就是客户端向接收queue1,就绑定在queue1上面,想接收queue2,就绑定在queue2上面。直接来个案例吧。
实际工作中,交换机和队列是作为资源,由运维管理员创建和绑定的。
,刚刚已经演示如何通过可视化工具创建了,现在通过消费者的代码来创建、绑定交换机和队列。
案例:
- 消费者1创建交换机为demo.fanout和队列fanout.queue1,并且绑定和监听。
- 消费者2创建交换机为demo.fanout和队列fanout.queue2,并且绑定和监听。重复创建交换机不会报错。
- 生产者向demo.fanout中发送消息。
- 查看消费者1和消费者2是否都收到相同消息。
整体代码:
- 生产者,之前的代码不用动,只是修改下,原本向queue中发送,现在发到exchanges中。
err = ch.PublishWithContext(ctx,
"demo.fanout", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(payload),
})
- consumer1,创建exchanges:demo.fanout、创建queue:fanout.queue1,并且绑定。
func main() {
conn, err := amqp.Dial("amqp://global:123456@124.223.47.250:5672/global")
defer conn.Close()
if err != nil {
log.Fatal(err.Error())
}
ch, err := conn.Channel()
defer ch.Close()
if err != nil {
log.Fatal(err.Error())
}
//声明创建一个fanout类型,名称为demo.fanout的交换机。
err = ch.ExchangeDeclare(
"demo.fanout", // exchange name
"fanout", // exchange type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err.Error())
}
//创建一个名称为fanout.queue1的队列
q, err := ch.QueueDeclare(
"fanout.queue1", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err.Error())
}
// 并且将队列绑定到demo.fanout的交换机上面
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"demo.fanout", // exchange
false,
nil,
)
if err != nil {
log.Fatal(err.Error())
}
// 监听固定队列的名称 q.Name
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatal(err.Error())
}
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
time.Sleep(1 * time.Second)
}
}()
<-forever
}
- consumer2,创建exchanges:demo.fanout、创建queue:fanout.queue2,并且绑定。代码如上,只是把fanout.queue1改成2即可。
一定要先起来consumer1和consumer2,再生产消息。
PS E:\workspaces\goland\study_demo\rbmq_demo\fanout\consume1> go run .\consume1.go
2023/10/19 22:28:06 Received a message: hello,i am message:0
2023/10/19 22:28:07 Received a message: hello,i am message:1
2023/10/19 22:28:08 Received a message: hello,i am message:2
2023/10/19 22:28:09 Received a message: hello,i am message:3
2023/10/19 22:28:10 Received a message: hello,i am message:4
PS E:\workspaces\goland\study_demo\rbmq_demo\fanout\consume2> go run .\consume2.go
2023/10/19 22:28:06 Received a message: hello,i am message:0
2023/10/19 22:28:07 Received a message: hello,i am message:1
2023/10/19 22:28:08 Received a message: hello,i am message:2
2023/10/19 22:28:09 Received a message: hello,i am message:3
2023/10/19 22:28:10 Received a message: hello,i am message:4
这样看上去,所有consumer都收到了消息。完成了我们的需求。
但是在实际的开发中,肯定是不同的人收到不同的消息,比如支付成功的消息,支付相关业务才关心,跟聊天业务没有关系,那聊天业务就不需要收到这个消息了。
所以,就需要用到Direct Exchange了
Direct交换机
Direct Exchange 会将接收到的消息根据规则路由到指定的 queue,因此称为 定向 路由。
如上图:
- 交换机和queue绑定,就需要指定一个routingKey,路由key了,如orange给了Q1,black、green给了Q2。
- 生产者向Exchange发送消息的时候,必须要指定routingKey。
- Exchange不再把消息交给每一个绑定个队列,而是根据消息的routingKey进行判断,只有队列的和消息的routingKey完全一致,才会接收到消息。
- 两个队列可以绑定相同的key,这样也可以实现Fanout交换机功能。
来个案例
- consumer1创建交换机为demo.direct和队列direct.queue1,并且绑定key为red、orange和监听。
- consumer2创建交换机为demo.direct和队列direct.queue2,并且绑定key为red、green和监听。
- 生产者向demo.direct中发送消息,并指定key为、red、orange、green。
- 查看消费者1和消费者2是否都收到指定key的消息。
- 生产者,之前的代码不用动,只是修改下,发到exchanges中增加相关key。
for i := 0; i < 10; i++ {
re := i % 3
rKey := ""
if re == 0 {
rKey = "red"
} else if re == 1 {
rKey = "orange"
} else {
rKey = "green"
}
payload := fmt.Sprintf("hello,i am message key:%s", rKey)
// 往这个交换机上面发信息
err = ch.PublishWithContext(ctx,
"demo.direct", // exchange
rKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(payload),
})
if err != nil {
log.Fatal(err.Error())
}
}
- consumer1,创建exchanges:demo.direct、创建queue:fanout.queue1,并且绑定key为red、orange。
func main() {
conn, err := amqp.Dial("amqp://global:123456@124.223.47.250:5672/global")
defer conn.Close()
if err != nil {
log.Fatal(err.Error())
}
ch, err := conn.Channel()
defer ch.Close()
if err != nil {
log.Fatal(err.Error())
}
//声明创建一个fanout类型,名称为demo.fanout的交换机。
err = ch.ExchangeDeclare(
"demo.direct", // exchange name
"direct", // exchange type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err.Error())
}
//创建一个名称为fanout.queue1的队列
q, err := ch.QueueDeclare(
"direct.queue1", // name
//"direct.queue2", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err.Error())
}
routingKes := []string{"red", "orange"}
//routingKes := []string{"red", "green"}
for _, rKey := range routingKes {
// 并且将队列绑定到demo.direct的交换机上面,指定key
err = ch.QueueBind(
q.Name, // queue name
rKey, // routing key
"demo.direct", // exchange
false,
nil,
)
if err != nil {
log.Fatal(err.Error())
}
}
// 监听固定队列的名称 q.Name
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatal(err.Error())
}
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
time.Sleep(1 * time.Second)
}
}()
<-forever
}
- consumer2,创建exchanges:demo.direct、创建queue:fanout.queue2,并且绑定key为red、green,修改下consumer1即可。
先起来consumer1和consumer2,再生产消息。
PS E:\workspaces\goland\study_demo\rbmq_demo\direct\consume1> go run .\consume1.go
2023/10/20 09:18:03 Received a message: hello,i am message key:red
2023/10/20 09:18:04 Received a message: hello,i am message key:orange
2023/10/20 09:18:05 Received a message: hello,i am message key:red
2023/10/20 09:18:06 Received a message: hello,i am message key:orange
2023/10/20 09:18:07 Received a message: hello,i am message key:red
2023/10/20 09:18:08 Received a message: hello,i am message key:orange
2023/10/20 09:18:09 Received a message: hello,i am message key:red
PS E:\workspaces\goland\study_demo\rbmq_demo\direct\consume2> go run .\consume1.go
2023/10/20 09:17:18 Received a message: hello,i am message key:red
2023/10/20 09:17:19 Received a message: hello,i am message key:green
2023/10/20 09:17:20 Received a message: hello,i am message key:red
2023/10/20 09:17:21 Received a message: hello,i am message key:green
2023/10/20 09:17:22 Received a message: hello,i am message key:red
2023/10/20 09:17:23 Received a message: hello,i am message key:green
2023/10/20 09:17:24 Received a message: hello,i am message key:red
由上,我们可以根据指定的key来获取消息,但是在实际工作中,一般是根据业务来的,比如支付业务,跟支付业务相关所有的消息,订单都需要关心,那总不能像上面例子一个个全部绑定吧。这样不是很灵活了。所以topic交换机来了。
topic交换机
和kafka中的topic不一样又有点一样。
topic类型和direct相比,差不多,都是可以根据key把消息路由到不同的队列,区别就在于对key的不同处理。topic可以让key使用通配符。
bindingkey一般有一个或者多个单词组成,多个单词之间以.
分割,比如:service.pay,service.pay.wx,service.order,service.order.detail.deliver等等。
通配符规则:
- #:匹配一个或多个词,比如
service.pay.#
,可以匹配:service.pay,,service.pay.wx, - *:匹配一个词。比如
service.*
,可以匹配:service.pay,service.order
来个例子:
- consumer1创建交换机为demo.topic和队列topic.pay.queue1,并且绑定key为service.pay.#和监听。
- consumer2创建交换机为demo.topic和队列topic.order.queue1,并且绑定key为service.order.#和监听。
- consumer3创建交换机为demo.topic和队列topic.service.queue1,并且绑定key为service.*和监听。
- 生产者向demo.topic中发送消息,并指定key为、service.pay,service.pay.wx,service.order,service.order.detail.deliver。
- 查看消费者1/2/3是否都收到指定key的消息。
- 生产者代码,修改为发送的相关:
for i := 0; i < 10; i++ {
re := i % 4
rKey := ""
//,,,
if re == 0 {
rKey = "service.pay"
} else if re == 1 {
rKey = "service.pay.wx"
} else if re == 2 {
rKey = "service.order"
} else {
rKey = "service.order.detail.deliver"
}
payload := fmt.Sprintf("hello,i am message key:%s", rKey)
// 往这个交换机上面发信息
err = ch.PublishWithContext(ctx,
"demo.topic", // exchange
rKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(payload),
})
if err != nil {
log.Fatal(err.Error())
}
- 生产者,通过修改代码起三个consumer,把注释分别打开即可。
func main() {
conn, err := amqp.Dial("amqp://global:123456@124.223.47.250:5672/global")
defer conn.Close()
if err != nil {
log.Fatal(err.Error())
}
ch, err := conn.Channel()
defer ch.Close()
if err != nil {
log.Fatal(err.Error())
}
//声明创建一个fanout类型,名称为demo.fanout的交换机。
err = ch.ExchangeDeclare(
"demo.topic", // exchange name
"topic", // exchange type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err.Error())
}
//创建一个名称为fanout.queue1的队列
q, err := ch.QueueDeclare(
"topic.pay.queue1", // name
//"topic.order.queue1", // name
//"topic.service.queue1", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatal(err.Error())
}
rKey := "service.pay.#"
//rKey := "service.order.#"
//rKey := "service.*"
// 并且将队列绑定到demo.direct的交换机上面,指定key
err = ch.QueueBind(
q.Name, // queue name
rKey, // routing key
"demo.topic", // exchange
false,
nil,
)
if err != nil {
log.Fatal(err.Error())
}
// 监听固定队列的名称 q.Name
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatal(err.Error())
}
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
time.Sleep(1 * time.Second)
}
}()
<-forever
}
写在最后
在实际开发时,队列和交换机是程序员定义的,将来项目上线,又要交给运维去创建。那么程序员就需要把程序中运行的所有队列和交换机都写下来,交给运维。在这个过程中是很容易出现错误的。因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。
这里推荐使用第三方的库,包括java中也有的,叫做 amqp
.
go get github.com/streadway/amqp
运维帮我们在服务器上创建用户、指定管理员和访问权限。下面针对rbbitmq进行的封装:
// 声明队列类型
type RabbitMQ struct {
channel *amqp.Channel
Name string
exchange string
}
// 连接服务器
func Connect(s string) *RabbitMQ {
//连接rabbitmq
conn, e := amqp.Dial(s)
failOnError(e, "连接Rabbitmq服务器失败!")
ch, e := conn.Channel()
failOnError(e, "无法打开频道!")
mq := new(RabbitMQ)
mq.channel = ch
return mq
}
// 初始化单个消息队列
// 第一个参数:rabbitmq服务器的链接,第二个参数:队列名字
func New(s string, name string) *RabbitMQ {
//连接rabbitmq
conn, e := amqp.Dial(s)
failOnError(e, "连接Rabbitmq服务器失败!")
ch, e := conn.Channel()
failOnError(e, "无法打开频道!")
q, e := ch.QueueDeclare(
name, //队列名
false, //是否开启持久化
true, //不使用时删除
false, //排他
false, //不等待
nil, //参数
)
failOnError(e, "初始化队列失败!")
mq := new(RabbitMQ)
mq.channel = ch
mq.Name = q.Name
return mq
}
//批量初始化消息队列
//第一个参数:rabbitmq服务器的链接,第二个参数:队列名字列表
// 声明交换机
func (q *RabbitMQ) QueueDeclare(queue string) {
_, e := q.channel.QueueDeclare(queue, false, true, false, false, nil)
failOnError(e, "声明交换机!")
}
// 删除交换机
func (q *RabbitMQ) QueueDelete(queue string) {
_, e := q.channel.QueueDelete(queue, false, true, false)
failOnError(e, "删除队列失败!")
}
// 配置队列参数
func (q *RabbitMQ) Qos() {
e := q.channel.Qos(1, 0, false)
failOnError(e, "无法设置QoS")
}
//配置交换机参数
// 初始化交换机
// 第一个参数:rabbitmq服务器的链接,第二个参数:交换机名字,第三个参数:交换机类型
func NewExchange(s string, name string, typename string) {
//连接rabbitmq
conn, e := amqp.Dial(s)
failOnError(e, "连接Rabbitmq服务器失败!")
ch, e := conn.Channel()
failOnError(e, "无法打开频道!")
e = ch.ExchangeDeclare(
name, // name
typename, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(e, "初始化交换机失败!")
}
// 删除交换机
func (q *RabbitMQ) ExchangeDelete(exchange string) {
e := q.channel.ExchangeDelete(exchange, false, true)
failOnError(e, "绑定队列失败!")
}
// 绑定消息队列到哪个exchange
func (q *RabbitMQ) Bind(exchange string, key string) {
e := q.channel.QueueBind(
q.Name,
key,
exchange,
false,
nil,
)
failOnError(e, "绑定队列失败!")
q.exchange = exchange
}
// 向消息队列发送消息
// Send方法可以往某个消息队列发送消息
func (q *RabbitMQ) Send(queue string, body interface{}) {
str, e := json.Marshal(body)
failOnError(e, "消息序列化失败!")
e = q.channel.Publish(
"", //交换
queue, //路由键
false, //必填
false, //立即
amqp.Publishing{
ReplyTo: q.Name,
Body: []byte(str),
})
msg := "向队列:" + q.Name + "发送消息失败!"
failOnError(e, msg)
}
// 向exchange发送消息
// Publish方法可以往某个exchange发送消息
func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) {
str, e := json.Marshal(body)
failOnError(e, "消息序列化失败!")
e = q.channel.Publish(
exchange,
key,
false,
false,
amqp.Publishing{ReplyTo: q.Name,
Body: []byte(str)},
)
failOnError(e, "向路由发送消息失败!")
}
// 接收某个消息队列的消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
c, e := q.channel.Consume(
q.Name, //指定从哪个队列中接收消息
"",
true,
false,
false,
false,
nil,
)
failOnError(e, "接收消息失败!")
return c
}
// 关闭队列连接
func (q *RabbitMQ) Close() {
q.channel.Close()
}
// 错误处理函数
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}