6

rabbitMq 交换机介绍 系列三 Direct [golang 版本]

 3 years ago
source link: https://studygolang.com/articles/33972
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

rabbitMq 交换机介绍 系列三 Direct [golang 版本]

forlife · 大约6小时之前 · 10 次点击 · 预计阅读时间 3 分钟 · 大约8小时之前 开始浏览    

4种不同的交换机类型

直连交换机:Direct exchange
扇形交换机:Fanout exchange
主题交换机:Topic exchange
首部交换机:Headers exchange

交换机具体含义参考 https://www.jianshu.com/p/469...

Direct 交换机

image.png

以下代码参数具体含义可以参考 https://segmentfault.com/a/11...

生产者示例 producer.go
package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "time"
)

//因:快速实现逻辑,故:不处理错误逻辑
func main() {
    conn, _ := amqp.Dial("amqp://user:password@host:ip/vhost")
    ch, _ := conn.Channel()
    body := "Hello World! " + time.Now().Format("2006-01-02 15:04:05")
    fmt.Println(body)
    var exchange_name string = "j_exch_direct"
    var routing_key string = "j_routing_key_direct"
    var etype string = amqp.ExchangeDirect
    //声明交换器
    ch.ExchangeDeclare(exchange_name, etype, true, false, false, false, nil)
    // fanout
    ch.Publish(
        exchange_name, // exchange 这里为空则不选择 exchange
        routing_key,   // routing key
        false,         // mandatory
        false,         // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
            //Expiration:  "3000", // 设置过期时间
        })

    // defer 关键字
    defer conn.Close() // 压栈 后进先出
    defer ch.Close()   // 压栈 后进先出

}
$ go run producer.go
Hello World! 2021-03-18 16:08:13
消费者示例 consumer.go
package main

import (
    "github.com/streadway/amqp"
    "log"
)

func main() {
    conn, _ := amqp.Dial("amqp://user:password@host:ip/vhost")
    ch, _ := conn.Channel()
    var exchange_name string = "j_exch_direct"
    var routing_key string = "j_routing_key_direct"
    var queue_name string = "j_queue"
    var etype string = amqp.ExchangeDirect // 直连交换机

    ch.QueueDeclare(queue_name, true, false, true, false, nil)
    //声明交换器 amqp.ExchangeFanout
    ch.ExchangeDeclare(exchange_name, etype, true, false, false, false, nil)

    ch.QueueBind(
        queue_name,    // queue name
        routing_key,   // routing key: direct 需一致
        exchange_name, // exchange
        false,
        nil)

    //监听
    msgs, _ := ch.Consume(
        queue_name, // queue name
        "",         // consumer
        true,       // auto-ack
        false,      // exclusive
        false,      // no-local
        false,      // no-wait
        nil,        // args
    )

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            //println("tset")
            log.Printf(" [x] %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever
}
$ go run consumer.go
2021/03/18 15:52:30  [*] Waiting for logs. To exit press CTRL+C
2021/03/18 15:52:34  [x] Hello World! 2021-03-18 15:52:34
2021/03/18 15:53:14  [x] Hello World! 2021-03-18 15:53:14
2021/03/18 16:08:13  [x] Hello World! 2021-03-18 16:08:13

有疑问加站长微信联系(非本文作者)

280

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:701969077


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK