RabbitMQ 常用工作模式 ━━ 简单(Simple)模式
简单模式是最简单常用的模式,由一个生产者发送消息到队列, 一个消费者接收,架构如如下:

- P:表示消息的生产者
- C:表示消息的消费者
- 中间红色的矩形:表示消息队列
使用 Go 语言实现 RabbitMQ 简单模式,代码如下:
首先,创建一个 RabbitMQ 结构体
type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel // 队列名称 QueueName string // 交换机 Exchange string // key Key string // 链接信息 Mqurl string }
再创建 RabbitMQ 结构体实例
// 创建 RabbitMQ 结构体实例 func NewRabbitMQ(quueName string, exchange string, key string) *RabbitMQ { rabbitmq := &RabbitMQ{QueueName: quueName, Exchange: exchange, Key: key, Mqurl: MQURL} var err error rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnError(err, "Create connection failed!") rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnError(err, "Get channel failed!") return rabbitmq }
创建简单模式下的 RabbitMQ
// 创建简单模式下的 RabbitMQ func NewRabbitMQSimple(queueName string) *RabbitMQ { return NewRabbitMQ(queueName, "", "") }
实现简单模式下发送消息的方法
// 发送消息 func (r *RabbitMQ) PublishSimple(message string) { // 申请队列,如果队列不存在会自动创建,如果存在则跳过创建,保证队列存在,消息能发送到队列中 _, err := r.channel.QueueDeclare( r.QueueName, false, // 是否持久化 false, // 是否为自动删除 false, // 是否具有排他性 false, // 是否阻塞 nil, // 额外属性 ) r.failOnError(err, "Failed to declare an queue") // 发行消息到队列中 err = r.channel.Publish( r.Exchange, r.QueueName, false, // 如果设置为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者 false, // 如果设置为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息返回给发送者 amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) r.failOnError(err, "Failed to publish an message") }
消费消息的方法
// 消费消息 func (r *RabbitMQ) ConsumeSimple() { // 申请队列,如果队列不存在会自动创建,如果存在则跳过创建,保证队列存在,消息能发送到队列中 _, err := r.channel.QueueDeclare( r.QueueName, false, // 是否持久化 false, // 是否为自动删除 false, // 是否具有排他性 false, // 是否阻塞 nil, // 额外属性 ) r.failOnError(err, "Failed to declare an queue") // 接受消息 msgs, err := r.channel.Consume( r.QueueName, "", // 用来区分多个消费者 true, // 是否自动应答 false, // 是否具有排他性 false, // 如果设置为 true,表示不能将同一个 connection 中发送的消息传递给这个 connection 中的消费者 false, // 是否阻塞 nil, // 额外参数 ) r.failOnError(err, "Failed to consume msg") forever := make(chan bool) // 启用协程处理消息 go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) fmt.Println(d.Body) } }() log.Printf("[*] Waiting for messages, To exit press CTRL + C") <-forever }
以下是 /RabbitMQ/rabbitmq.go
文件中的完整代码
package RabbitMQ import ( "fmt" "github.com/streadway/amqp" "log" ) // url 格式: amqp://账号:密码@rabbitmq 服务器地址:端口号/vhost const MQURL = "amqp://zsc:zsc123456@192.168.3.16:5672/eesion" type RabbitMQ struct { conn *amqp.Connection channel *amqp.Channel // 队列名称 QueueName string // 交换机 Exchange string // key Key string // 链接信息 Mqurl string } // 创建 RabbitMQ 结构体实例 func NewRabbitMQ(quueName string, exchange string, key string) *RabbitMQ { rabbitmq := &RabbitMQ{QueueName: quueName, Exchange: exchange, Key: key, Mqurl: MQURL} var err error rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl) rabbitmq.failOnError(err, "Create connection failed!") rabbitmq.channel, err = rabbitmq.conn.Channel() rabbitmq.failOnError(err, "Get channel failed!") return rabbitmq } // 创建简单模式下的 RabbitMQ func NewRabbitMQSimple(queueName string) *RabbitMQ { return NewRabbitMQ(queueName, "", "") } // 发送消息 func (r *RabbitMQ) PublishSimple(message string) { // 申请队列,如果队列不存在会自动创建,如果存在则跳过创建,保证队列存在,消息能发送到队列中 _, err := r.channel.QueueDeclare( r.QueueName, false, // 是否持久化 false, // 是否为自动删除 false, // 是否具有排他性 false, // 是否阻塞 nil, // 额外属性 ) r.failOnError(err, "Failed to declare an queue") // 发行消息到队列中 err = r.channel.Publish( r.Exchange, r.QueueName, false, // 如果设置为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者 false, // 如果设置为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息返回给发送者 amqp.Publishing{ ContentType: "text/plain", Body: []byte(message), }) r.failOnError(err, "Failed to publish an message") } // 消费消息 func (r *RabbitMQ) ConsumeSimple() { // 申请队列,如果队列不存在会自动创建,如果存在则跳过创建,保证队列存在,消息能发送到队列中 _, err := r.channel.QueueDeclare( r.QueueName, false, // 是否持久化 false, // 是否为自动删除 false, // 是否具有排他性 false, // 是否阻塞 nil, // 额外属性 ) r.failOnError(err, "Failed to declare an queue") // 接受消息 msgs, err := r.channel.Consume( r.QueueName, "", // 用来区分多个消费者 true, // 是否自动应答 false, // 是否具有排他性 false, // 如果设置为 true,表示不能将同一个 connection 中发送的消息传递给这个 connection 中的消费者 false, // 是否阻塞 nil, // 额外参数 ) r.failOnError(err, "Failed to consume msg") forever := make(chan bool) // 启用协程处理消息 go func() { for d := range msgs { log.Printf("Received a message: %s", d.Body) fmt.Println(d.Body) } }() log.Printf("[*] Waiting for messages, To exit press CTRL + C") <-forever } // 销毁链接 func (r *RabbitMQ) Destory() { r.channel.Close() r.conn.Close() } // 错误处理 func (r *RabbitMQ) failOnError(err error, message string) { if err != nil { log.Fatalf("%s:%s", message, err) panic(fmt.Sprintf("%s:%s", message, err)) } }
生产端代码如下:
package main import ( "eesion-rabbitmq/RabbitMQ" "fmt" ) func main() { rabbitmq := RabbitMQ.NewRabbitMQSimple("eesionSimple") rabbitmq.PublishSimple("Hello eesion") fmt.Println("发送成功!") }
消费端代码如下:
package main import "eesion-rabbitmq/RabbitMQ" func main() { rabbitmq := RabbitMQ.NewRabbitMQSimple("eesionSimple") rabbitmq.ConsumeSimple() }
运行消费端和生产端,效果如下图:

以上为 Go 语言实现 RabbitMQ 简单模式