RabbitMQ 常用工作模式 ━━ 简单(Simple)模式

简单模式是最简单常用的模式,由一个生产者发送消息到队列, 一个消费者接收,架构如如下:

  • P:表示消息的生产者
  • C:表示消息的消费者
  • 中间红色的矩形:表示消息队列

使用 Go 语言实现 RabbitMQ 简单模式,代码如下:

首先,创建一个 RabbitMQ 结构体


type RabbitMQ struct {
    conn *amqp.Connection
    channel *amqp.Channel

    // 队列名称
    QueueName string

    // 交换机
    Exchange string

    // key
    Key string

    // 链接信息
    Mqurl string
}

再创建 RabbitMQ 结构体实例

// 创建 RabbitMQ 结构体实例
func NewRabbitMQ(quueName string, exchange string, key string) *RabbitMQ {
    rabbitmq := &RabbitMQ{QueueName: quueName, Exchange: exchange, Key: key, Mqurl: MQURL}

    var err error
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnError(err, "Create connection failed!")
    rabbitmq.channel, err = rabbitmq.conn.Channel()
    rabbitmq.failOnError(err, "Get channel failed!")
    return rabbitmq
}

创建简单模式下的 RabbitMQ

// 创建简单模式下的 RabbitMQ
func NewRabbitMQSimple(queueName string) *RabbitMQ {
    return NewRabbitMQ(queueName, "", "")
}

实现简单模式下发送消息的方法

// 发送消息
func (r *RabbitMQ) PublishSimple(message string) {
    // 申请队列,如果队列不存在会自动创建,如果存在则跳过创建,保证队列存在,消息能发送到队列中
    _, err := r.channel.QueueDeclare(
        r.QueueName,
        false, // 是否持久化
        false, // 是否为自动删除
        false, // 是否具有排他性
        false, // 是否阻塞
        nil, // 额外属性
    )

    r.failOnError(err, "Failed to declare an queue")

    // 发行消息到队列中
    err = r.channel.Publish(
        r.Exchange,
        r.QueueName,
        false, // 如果设置为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者
        false, // 如果设置为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息返回给发送者
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(message),
        })

    r.failOnError(err, "Failed to publish an message")
}

消费消息的方法

// 消费消息
func (r *RabbitMQ) ConsumeSimple() {
    // 申请队列,如果队列不存在会自动创建,如果存在则跳过创建,保证队列存在,消息能发送到队列中
    _, err := r.channel.QueueDeclare(
        r.QueueName,
        false, // 是否持久化
        false, // 是否为自动删除
        false, // 是否具有排他性
        false, // 是否阻塞
        nil, // 额外属性
    )

    r.failOnError(err, "Failed to declare an queue")

    // 接受消息
    msgs, err := r.channel.Consume(
        r.QueueName,
        "", // 用来区分多个消费者
        true, // 是否自动应答
        false, // 是否具有排他性
        false, // 如果设置为 true,表示不能将同一个 connection 中发送的消息传递给这个 connection 中的消费者
        false, // 是否阻塞
        nil, // 额外参数
    )

    r.failOnError(err, "Failed to consume msg")

    forever := make(chan bool)

    // 启用协程处理消息
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            fmt.Println(d.Body)
        }
    }()

    log.Printf("[*] Waiting for messages, To exit press CTRL + C")
    <-forever
}

以下是 /RabbitMQ/rabbitmq.go 文件中的完整代码

package RabbitMQ

import (
    "fmt"
    "github.com/streadway/amqp"
    "log"
)

// url 格式: amqp://账号:密码@rabbitmq 服务器地址:端口号/vhost
const MQURL = "amqp://zsc:zsc123456@192.168.3.16:5672/eesion"

type RabbitMQ struct {
    conn *amqp.Connection
    channel *amqp.Channel

    // 队列名称
    QueueName string

    // 交换机
    Exchange string

    // key
    Key string

    // 链接信息
    Mqurl string
}

// 创建 RabbitMQ 结构体实例
func NewRabbitMQ(quueName string, exchange string, key string) *RabbitMQ {
    rabbitmq := &amp;RabbitMQ{QueueName: quueName, Exchange: exchange, Key: key, Mqurl: MQURL}

    var err error
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnError(err, "Create connection failed!")
    rabbitmq.channel, err = rabbitmq.conn.Channel()
    rabbitmq.failOnError(err, "Get channel failed!")
    return rabbitmq
}

// 创建简单模式下的 RabbitMQ
func NewRabbitMQSimple(queueName string) *RabbitMQ {
    return NewRabbitMQ(queueName, "", "")
}

// 发送消息
func (r *RabbitMQ) PublishSimple(message string) {
    // 申请队列,如果队列不存在会自动创建,如果存在则跳过创建,保证队列存在,消息能发送到队列中
    _, err := r.channel.QueueDeclare(
        r.QueueName,
        false, // 是否持久化
        false, // 是否为自动删除
        false, // 是否具有排他性
        false, // 是否阻塞
        nil, // 额外属性
    )

    r.failOnError(err, "Failed to declare an queue")

    // 发行消息到队列中
    err = r.channel.Publish(
        r.Exchange,
        r.QueueName,
        false, // 如果设置为 true,根据 exchange 类型和 routkey 规则,如果无法找到符合条件的队列,那么会把发送的消息返回给发送者
        false, // 如果设置为 true,当 exchange 发送消息到队列后发现队列上没有绑定消费者,则会把消息返回给发送者
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(message),
        })

    r.failOnError(err, "Failed to publish an message")
}

// 消费消息
func (r *RabbitMQ) ConsumeSimple() {
    // 申请队列,如果队列不存在会自动创建,如果存在则跳过创建,保证队列存在,消息能发送到队列中
    _, err := r.channel.QueueDeclare(
        r.QueueName,
        false, // 是否持久化
        false, // 是否为自动删除
        false, // 是否具有排他性
        false, // 是否阻塞
        nil, // 额外属性
    )

    r.failOnError(err, "Failed to declare an queue")

    // 接受消息
    msgs, err := r.channel.Consume(
        r.QueueName,
        "", // 用来区分多个消费者
        true, // 是否自动应答
        false, // 是否具有排他性
        false, // 如果设置为 true,表示不能将同一个 connection 中发送的消息传递给这个 connection 中的消费者
        false, // 是否阻塞
        nil, // 额外参数
    )

    r.failOnError(err, "Failed to consume msg")

    forever := make(chan bool)

    // 启用协程处理消息
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            fmt.Println(d.Body)
        }
    }()

    log.Printf("[*] Waiting for messages, To exit press CTRL + C")
    <-forever
}

// 销毁链接
func (r *RabbitMQ) Destory() {
    r.channel.Close()
    r.conn.Close()
}

// 错误处理
func (r *RabbitMQ) failOnError(err error, message string) {
    if err != nil {
        log.Fatalf("%s:%s", message, err)
        panic(fmt.Sprintf("%s:%s", message, err))
    }
}

生产端代码如下:

package main

import (
    "eesion-rabbitmq/RabbitMQ"
    "fmt"
)

func main() {
    rabbitmq := RabbitMQ.NewRabbitMQSimple("eesionSimple")
    rabbitmq.PublishSimple("Hello eesion")

    fmt.Println("发送成功!")
}

消费端代码如下:

package main

import "eesion-rabbitmq/RabbitMQ"

func main() {
    rabbitmq := RabbitMQ.NewRabbitMQSimple("eesionSimple")
    rabbitmq.ConsumeSimple()
}

运行消费端和生产端,效果如下图:

以上为 Go 语言实现 RabbitMQ 简单模式

标签:,

About: Mr.zhang

成谜于写 bug 无法自拔~


发表评论

邮箱地址不会被公开。 必填项已用*标注