7
rabbitMq 交换机介绍 系列四 Topic [golang 版本]
source link: https://studygolang.com/articles/33973
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
rabbitMq 交换机介绍 系列四 Topic [golang 版本]
forlife · 大约6小时之前 · 11 次点击 · 预计阅读时间 3 分钟 · 大约8小时之前 开始浏览4种不同的交换机类型
直连交换机:Direct exchange
扇形交换机:Fanout exchange
主题交换机:Topic exchange
首部交换机:Headers exchange
交换机具体含义参考 https://www.jianshu.com/p/469...
Topic 交换机
生产者示例 producer.go以下代码参数具体含义可以参考 https://segmentfault.com/a/11...
package main
import (
"fmt"
"github.com/streadway/amqp"
"time"
)
//因:快速实现逻辑,故:不处理错误逻辑
func main() {
conn, _ := amqp.Dial("amqp://user:password@host:ip/vhost")
ch, _ := conn.Channel()
body := "Hello World! " + time.Now().Format("2006-01-02 15:04:05")
fmt.Println(body)
var exchange_name string = "j_exch_topic"
var routing_key string = "jkey.a.b"
var etype string = amqp.ExchangeTopic
//声明交换器
ch.ExchangeDeclare(exchange_name, etype, true, false, false, false, nil)
// fanout
ch.Publish(
exchange_name, // exchange 这里为空则不选择 exchange
routing_key, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
//Expiration: "3000", // 设置过期时间
})
// defer 关键字
defer conn.Close() // 压栈 后进先出
defer ch.Close() // 压栈 后进先出
}
$ go run producer.go
Hello World! 2021-03-18 16:08:13
消费者示例 consumer.gopackage main
import (
"github.com/streadway/amqp"
"log"
)
func main() {
conn, _ := amqp.Dial("amqp://user:password@host:ip/vhost")
ch, _ := conn.Channel()
var exchange_name string = "j_exch_topic"
var routing_key string = "jkey.#"
var queue_name string = "j_queue"
var etype string = amqp.ExchangeTopic // 主题交换机
ch.QueueDeclare(queue_name, true, false, true, false, nil)
//声明交换器 amqp.ExchangeFanout
ch.ExchangeDeclare(exchange_name, etype, true, false, false, false, nil)
ch.QueueBind(
queue_name, // queue name
routing_key, // routing key: topic 匹配jkey.#
exchange_name, // exchange
false,
nil)
//监听
msgs, _ := ch.Consume(
queue_name, // queue name
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
forever := make(chan bool)
go func() {
for d := range msgs {
//println("tset")
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
$ go run consumer.go
2021/03/18 15:52:30 [*] Waiting for logs. To exit press CTRL+C
2021/03/18 15:52:34 [x] Hello World! 2021-03-18 15:52:34
2021/03/18 15:53:14 [x] Hello World! 2021-03-18 15:53:14
2021/03/18 16:08:13 [x] Hello World! 2021-03-18 16:08:13
有疑问加站长微信联系(非本文作者)
入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:701969077
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK