golang遇到docker很简单
source link: https://studygolang.com/articles/25730
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
本文转自: https://www.cnblogs.com/angelyan/p/11218260.html
一、获取镜像指定版本,该版本包含了web控制页面
docker pull rabbitmq:management
二、运行镜像
方式一:默认guest 用户,密码也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
方式二:设置用户名和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 -p 5672:5672 rabbitmq:management
三、访问ui页面
image
四、golang案例
#producer生产者代码 package main import ( "fmt" "log" "github.com/streadway/amqp" ) const ( //AMQP URI uri = "amqp://guest:[email protected]:5672/" // 10.0.0.11为主机ip //Durable AMQP exchange name exchangeName = "" //Durable AMQP queue name queueName = "test-queues" //Body of message bodyMsg string = "hello angel" ) //如果存在错误,则输出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { //调用发布消息函数 publish(uri, exchangeName, queueName, bodyMsg) log.Printf("published %dB OK", len(bodyMsg)) } //发布者的方法 //@amqpURI, amqp的地址 //@exchange, exchange的名称 //@queue, queue的名称 //@body, 主体内容 func publish(amqpURI string, exchange string, queue string, body string) { //建立连接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //创建一个Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //创建一个queue q, err := channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("declared queue, publishing %dB body (%q)", len(body), body) // Producer只能发送到exchange,它是不能直接发送到queue的 // 现在我们使用默认的exchange(名字是空字符)这个默认的exchange允许我们发送给指定的queue // routing_key就是指定的queue名字 err = channel.Publish( exchange, // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ Headers: amqp.Table{}, ContentType: "text/plain", ContentEncoding: "", Body: []byte(body), }) failOnError(err, "Failed to publish a message") }
image.png
生产者生产数据
#producer package main import ( "fmt" "github.com/streadway/amqp" "log" "os" "strings" ) const ( //AMQP URI uri = "amqp://guest:[email protected]:5672/" //Durable AMQP exchange name exchangeName = "" //Durable AMQP queue name queueName = "test-queues-acknowledgments" ) //如果存在错误,则输出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { bodyMsg := bodyFrom(os.Args) //调用发布消息函数 publish(uri, exchangeName, queueName, bodyMsg) log.Printf("published %dB OK", len(bodyMsg)) } func bodyFrom(args []string) string { var s string if (len(args) < 2) || os.Args[1] == "" { s = "hello angel" } else { s = strings.Join(args[1:], " ") } return s } //发布者的方法 //@amqpURI, amqp的地址 //@exchange, exchange的名称 //@queue, queue的名称 //@body, 主体内容 func publish(amqpURI string, exchange string, queue string, body string) { //建立连接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //创建一个Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //创建一个queue q, err := channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("declared queue, publishing %dB body (%q)", len(body), body) // Producer只能发送到exchange,它是不能直接发送到queue的。 // 现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。 // routing_key就是指定的queue名字。 err = channel.Publish( exchange, // exchange q.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ Headers: amqp.Table{}, ContentType: "text/plain", ContentEncoding: "", Body: []byte(body), }) failOnError(err, "Failed to publish a message") }
消费者消费数据
#consumer package main import ( "bytes" "fmt" "github.com/streadway/amqp" "log" "time" ) const ( //AMQP URI uri = "amqp://guest:[email protected]:5672/" //Durable AMQP exchange nam exchangeName = "" //Durable AMQP queue name queueName = "test-queues-acknowledgments" ) //如果存在错误,则输出 func failOnError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) panic(fmt.Sprintf("%s: %s", msg, err)) } } func main() { //调用消息接收者 consumer(uri, exchangeName, queueName) } //接收者方法 //@amqpURI, amqp的地址 //@exchange, exchange的名称 //@queue, queue的名称 func consumer(amqpURI string, exchange string, queue string) { //建立连接 log.Printf("dialing %q", amqpURI) connection, err := amqp.Dial(amqpURI) failOnError(err, "Failed to connect to RabbitMQ") defer connection.Close() //创建一个Channel log.Printf("got Connection, getting Channel") channel, err := connection.Channel() failOnError(err, "Failed to open a channel") defer channel.Close() log.Printf("got queue, declaring %q", queue) //创建一个queue q, err := channel.QueueDeclare( queueName, // name false, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) failOnError(err, "Failed to declare a queue") log.Printf("Queue bound to Exchange, starting Consume") //订阅消息 msgs, err := channel.Consume( q.Name, // queue "", // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) failOnError(err, "Failed to register a consumer") //创建一个channel forever := make(chan bool) //调用gorountine go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) dot_count := bytes.Count(d.Body, []byte(".")) t := time.Duration(dot_count) time.Sleep(t * time.Second) log.Printf("Done") d.Ack(false) } }() log.Printf(" [*] Waiting for messages. To exit press CTRL+C") //没有写入数据,一直等待读,阻塞当前线程,目的是让线程不退出 <-forever }
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK