10

golang遇到docker很简单

 4 years ago
source link: https://studygolang.com/articles/25730
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文转自: https://www.cnblogs.com/angelyan/p/11218260.html

一、获取镜像指定版本,该版本包含了web控制页面

docker pull rabbitmq:management

二、运行镜像

方式一:默认guest 用户,密码也是 guest

docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

方式二:设置用户名和密码

docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management

三、访问ui页面

http://localhost:15672/

26ZzMnQ.png!web

image

四、golang案例

#producer生产者代码
package main

import (
    "fmt"

    "log"

    "github.com/streadway/amqp"
)

const (
    //AMQP URI

    uri = "amqp://guest:[email protected]:5672/" // 10.0.0.11为主机ip

    //Durable AMQP exchange name

    exchangeName = ""

    //Durable AMQP queue name

    queueName = "test-queues"

    //Body of message

    bodyMsg string = "hello angel"
)

//如果存在错误,则输出

func failOnError(err error, msg string) {

    if err != nil {

        log.Fatalf("%s: %s", msg, err)

        panic(fmt.Sprintf("%s: %s", msg, err))

    }

}

func main() {

    //调用发布消息函数

    publish(uri, exchangeName, queueName, bodyMsg)

    log.Printf("published %dB OK", len(bodyMsg))

}

//发布者的方法

//@amqpURI, amqp的地址

//@exchange, exchange的名称

//@queue, queue的名称

//@body, 主体内容

func publish(amqpURI string, exchange string, queue string, body string) {

    //建立连接

    log.Printf("dialing %q", amqpURI)

    connection, err := amqp.Dial(amqpURI)

    failOnError(err, "Failed to connect to RabbitMQ")

    defer connection.Close()

    //创建一个Channel

    log.Printf("got Connection, getting Channel")

    channel, err := connection.Channel()

    failOnError(err, "Failed to open a channel")

    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //创建一个queue

    q, err := channel.QueueDeclare(

        queueName, // name

        false, // durable

        false, // delete when unused

        false, // exclusive

        false, // no-wait

        nil, // arguments

    )

    failOnError(err, "Failed to declare a queue")

    log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

    // Producer只能发送到exchange,它是不能直接发送到queue的

    // 现在我们使用默认的exchange(名字是空字符)这个默认的exchange允许我们发送给指定的queue

    // routing_key就是指定的queue名字

    err = channel.Publish(

        exchange, // exchange

        q.Name, // routing key

        false, // mandatory

        false, // immediate

        amqp.Publishing{

            Headers: amqp.Table{},

            ContentType: "text/plain",

            ContentEncoding: "",

            Body: []byte(body),
        })

    failOnError(err, "Failed to publish a message")

}
E3MvIvY.png!web

image.png

生产者生产数据

#producer
package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "os"
    "strings"
)

const (
    //AMQP URI
    uri = "amqp://guest:[email protected]:5672/"
    //Durable AMQP exchange name
    exchangeName = ""
    //Durable AMQP queue name
    queueName = "test-queues-acknowledgments"
)

//如果存在错误,则输出
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    bodyMsg := bodyFrom(os.Args)
    //调用发布消息函数
    publish(uri, exchangeName, queueName, bodyMsg)
    log.Printf("published %dB OK", len(bodyMsg))
}

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello angel"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

//发布者的方法
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
//@body, 主体内容
func publish(amqpURI string, exchange string, queue string, body string) {
    //建立连接
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    //创建一个Channel
    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //创建一个queue
    q, err := channel.QueueDeclare(
        queueName, // name
        false,     // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf("declared queue, publishing %dB body (%q)", len(body), body)

    // Producer只能发送到exchange,它是不能直接发送到queue的。
    // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。
    // routing_key就是指定的queue名字。
    err = channel.Publish(
        exchange, // exchange
        q.Name,   // routing key
        false,    // mandatory
        false,    // immediate
        amqp.Publishing{
            Headers:         amqp.Table{},
            ContentType:     "text/plain",
            ContentEncoding: "",
            Body:            []byte(body),
        })
    failOnError(err, "Failed to publish a message")
}

消费者消费数据

#consumer
package main

import (
    "bytes"
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "time"
)

const (
    //AMQP URI
    uri = "amqp://guest:[email protected]:5672/"
    //Durable AMQP exchange nam
    exchangeName = ""
    //Durable AMQP queue name
    queueName = "test-queues-acknowledgments"
)

//如果存在错误,则输出
func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
        panic(fmt.Sprintf("%s: %s", msg, err))
    }
}

func main() {
    //调用消息接收者
    consumer(uri, exchangeName, queueName)
}

//接收者方法
//@amqpURI, amqp的地址
//@exchange, exchange的名称
//@queue, queue的名称
func consumer(amqpURI string, exchange string, queue string) {
    //建立连接
    log.Printf("dialing %q", amqpURI)
    connection, err := amqp.Dial(amqpURI)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer connection.Close()

    //创建一个Channel
    log.Printf("got Connection, getting Channel")
    channel, err := connection.Channel()
    failOnError(err, "Failed to open a channel")
    defer channel.Close()

    log.Printf("got queue, declaring %q", queue)

    //创建一个queue
    q, err := channel.QueueDeclare(
        queueName, // name
        false,     // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // no-wait
        nil,       // arguments
    )
    failOnError(err, "Failed to declare a queue")

    log.Printf("Queue bound to Exchange, starting Consume")
    //订阅消息
    msgs, err := channel.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    //创建一个channel
    forever := make(chan bool)

    //调用gorountine
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            dot_count := bytes.Count(d.Body, []byte("."))
            t := time.Duration(dot_count)
            time.Sleep(t * time.Second)
            log.Printf("Done")
            d.Ack(false)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出
    <-forever
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK