Go语言框架中如何快速集成RabbitMq

本文涉及的产品
数字短信套餐包(仅限零售电商行业),100条 12个月
短信服务,100条 3个月
短信服务,200条 3个月
简介: Go语言框架中如何快速集成RabbitMq

前言


在我们日常开发中, 消息队列是必不可少的一环, RabbitMq是一个实现了AMQP高级消息队列协议的消息队列服务, 也是我们比较常用的消息队列, 还可以实现延迟消费, 今天来说说怎么把它集成到我们的开发框架中;


RabbitMq

安装


使用docker安装一条语句简单快捷


docker run -d --name rabbit-mq -p 15672:15672 -p 5672:5672 rabbitmq:3-management


然后可以通过请求[http://127.0.0.1:15672/#/](http://127.0.0.1:15672/#/)来访问管理后台, 默认账号密码都是 guest;


作用


异步, 削峰, 解耦老三套了, 场景实例其他博客都写烂了, 我这里简单列个场景大家都理解就行:

用户注册成功需要发送短信通知用户, 还需要通知钱包服务和埋点服务进行对应的初始化和记录;


用户注册成功短信可以写入消息队列, 然后主流程返回注册成功结果, 短信服务去消费队列发送短信, 这个过程就叫异步;

短信服务支撑平台全部的短信业务, 在大并发的情况下可能会导致短信服务挂掉, 可以将需要发送的短信内容都放入消息队列, 然后短信服务以固定的速率去消费, 这就叫削峰(其实就是限流);

用户注册成功需要通知钱包服务和埋点服务, 后续还有可能会有ABCDE等其他服务, 所以直接将注册成功信息写入消息队列, 谁用谁就去消费就可以, 不用在主流程里面处理所有的耦合逻辑, 这就叫解耦;


交换机


详细信息大家可以参考下阿里云exchang简介


类型 解释
direct 根据消息发送的key路由到跟交换机上绑定key完全一致的队列
topic 跟direct类型都是通过key来路由, topic类型支持通配符, 星号(*)代表一个英文单词(例如cn)。井号(#)代表零个、一个或多个英文单词,英文单词间通过英文句号(.)分隔,例如user.register.event
fanout 发送到exchange的消息会被路由到所有跟exchange绑定的队列, 不区分key
headers 根据消息的headers属性进行路由, 不常用


集成实现

使用注意


mq的channel是有数量限制的(默认2047), 超过限制之后无法创建新的channel, 会报错(channel id space exhausted), 所以建议单个服务不要过多创建新的channel;


建议项目中频率不高的生产者共用一个channel, 消费者的话建议一个消费者对应一个channel(1 consumer per channel);


rabbitmq.go


为了保证mq在遭遇异常的情况下可以实现重新连接, 所以这块引用了amqp091-go里面issues的一个提交;


在打开 connect/channel 时会额外开一个go程去监听mq服务关闭通知, 如果遇到异常情况则会每隔三秒会去尝试生成一个可用的 connect/channel 去替换之前的异常connect/channel;

我试了下消费者在正常消费的情况下遇到关闭mq服务, 然后再将mq服务打开, 消费者可以继续正常消费;

package mq
import (
  "log"
  "sync/atomic"
  "time"
  amqp "github.com/rabbitmq/amqp091-go"
)
// 参阅 https://github.com/rabbitmq/amqp091-go/commit/4ce2c8e4e371338add82c3dc2df56f70d0dca601
const delay = 3 // reconnect after delay seconds
// Connection amqp.Connection wrapper
type Connection struct {
  *amqp.Connection
}
// Channel wrap amqp.Connection.Channel, get a auto reconnect channel
func (c *Connection) Channel() (*Channel, error) {
  ch, err := c.Connection.Channel()
  if err != nil {
    return nil, err
  }
  channel := &Channel{
    Channel: ch,
  }
  go func() {
    for {
      reason, ok := <-channel.Channel.NotifyClose(make(chan *amqp.Error))
      // exit this goroutine if closed by developer
      if !ok || channel.IsClosed() {
        log.Println("channel closed")
        _ = channel.Close() // close again, ensure closed flag set when connection closed
        break
      }
      log.Printf("channel closed, reason: %v", reason)
      // reconnect if not closed by developer
      for {
        // wait 1s for connection reconnect
        time.Sleep(delay * time.Second)
        ch, err := c.Connection.Channel()
        if err == nil {
          log.Println("channel recreate success")
          channel.Channel = ch
          break
        }
        log.Printf("channel recreate failed, err: %v", err)
      }
    }
  }()
  return channel, nil
}
// Dial wrap amqp.Dial, dial and get reconnect connection
func Dial(url string) (*Connection, error) {
  conn, err := amqp.Dial(url)
  if err != nil {
    return nil, err
  }
  connection := &Connection{
    Connection: conn,
  }
  go func() {
    for {
      reason, ok := <-connection.Connection.NotifyClose(make(chan *amqp.Error))
      // exit this goroutine if closed by developer
      if !ok {
        log.Println("connection closed")
        break
      }
      log.Printf("connection closed, reason: %v", reason)
      // reconnect if not closed by developer
      for {
        // wait 1s for reconnect
        time.Sleep(delay * time.Second)
        conn, err := amqp.Dial(url)
        if err == nil {
          connection.Connection = conn
          log.Println("reconnect success")
          break
        }
        log.Printf("reconnect failed, err: %v", err)
      }
    }
  }()
  return connection, nil
}
// Channel amqp.Channel wapper
type Channel struct {
  *amqp.Channel
  closed int32
}
// IsClosed indicate closed by developer
func (ch *Channel) IsClosed() bool {
  return atomic.LoadInt32(&ch.closed) == 1
}
// Close ensure closed flag set
func (ch *Channel) Close() error {
  if ch.IsClosed() {
    return amqp.ErrClosed
  }
  atomic.StoreInt32(&ch.closed, 1)
  return ch.Channel.Close()
}
// Consume wrap amqp.Channel.Consume, the returned delivery will end only when channel closed by developer
func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
  deliveries := make(chan amqp.Delivery)
  go func() {
    for {
      d, err := ch.Channel.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
      if err != nil {
        log.Printf("consume failed, err: %v", err)
        time.Sleep(delay * time.Second)
        continue
      }
      for msg := range d {
        deliveries <- msg
      }
      // sleep before IsClose call. closed flag may not set before sleep.
      time.Sleep(delay * time.Second)
      if ch.IsClosed() {
        break
      }
    }
  }()
  return deliveries, nil
}

index.go


初始化rabbitmq, 并生成一个默认的channel(频率不高的生产者可以共用), 以及封装了一些常用的方法;

package mq
import (
  "context"
  "fmt"
  amqp "github.com/rabbitmq/amqp091-go"
  "time"
)
type (
  Conf struct {
    Addr string
    Port string
    User string
    Pwd  string
  }
)
var (
  defaultConn    *Connection
  defaultChannel *Channel
)
// Init 初始化
func Init(c Conf) (err error) {
  if c.Addr == "" {
    return nil
  }
  defaultConn, err = Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/",
    c.User,
    c.Pwd,
    c.Addr,
    c.Port))
  if err != nil {
    return fmt.Errorf("new mq conn err: %v", err)
  }
  defaultChannel, err = defaultConn.Channel()
  if err != nil {
    return fmt.Errorf("new mq channel err: %v", err)
  }
  return
}
// NewChannel 获取channel.
func NewChannel() *Channel {
  return defaultChannel
}
// ExchangeDeclare 创建交换机.
func (ch *Channel) ExchangeDeclare(name string, kind string) (err error) {
  return ch.Channel.ExchangeDeclare(name, kind, true, false, false, false, nil)
}
// Publish 发布消息.
func (ch *Channel) Publish(exchange, key string, body []byte) (err error) {
  _, err = ch.Channel.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, false, false,
    amqp.Publishing{ContentType: "text/plain", Body: body})
  return err
}
// PublishWithDelay 发布延迟消息.
func (ch *Channel) PublishWithDelay(exchange, key string, body []byte, timer time.Duration) (err error) {
  _, err = ch.Channel.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, false, false,
    amqp.Publishing{ContentType: "text/plain", Body: body, Expiration: fmt.Sprintf("%d", timer.Milliseconds())})
  return err
}
// QueueDeclare 创建队列.
func (ch *Channel) QueueDeclare(name string) (err error) {
  _, err = ch.Channel.QueueDeclare(name, true, false, false, false, nil)
  return
}
// QueueDeclareWithDelay 创建延迟队列.
func (ch *Channel) QueueDeclareWithDelay(name, exchange, key string) (err error) {
  _, err = ch.Channel.QueueDeclare(name, true, false, false, false, amqp.Table{
    "x-dead-letter-exchange":    exchange,
    "x-dead-letter-routing-key": key,
  })
  return
}
// QueueBind 绑定队列.
func (ch *Channel) QueueBind(name, key, exchange string) (err error) {
  return ch.Channel.QueueBind(name, key, exchange, false, nil)
}
// NewConsumer 实例化一个消费者, 会单独用一个channel.
func NewConsumer(queue string, handler func([]byte) error) error {
  ch, err := defaultConn.Channel()
  if err != nil {
    return fmt.Errorf("new mq channel err: %v", err)
  }
  deliveries, err := ch.Consume(queue, "", false, false, false, false, nil)
  if err != nil {
    return fmt.Errorf("consume err: %v, queue: %s", err, queue)
  }
  for msg := range deliveries {
    err = handler(msg.Body)
    if err != nil {
      _ = msg.Reject(true)
      continue
    }
    _ = msg.Ack(false)
  }
  return nil
}

index_test.go


使用示例, 实现了生产者消费者及延迟队列; 新手可以直接使用快速上手;

package mq
import (
  "fmt"
  "log"
  "testing"
  "time"
)
func nowTime() string {
  return time.Now().Format("2006-01-02 15:04:05")
}
func TestChannel_Publish(t *testing.T) {
  var (
    conf = Conf{
      User: "guest",
      Pwd:  "guest",
      Addr: "127.0.0.1",
      Port: "5672",
    }
    exchangeName = "user.register.direct"
    queueName    = "user.register.queue"
    keyName      = "user.register.event"
  )
  if err := Init(conf); err != nil {
    log.Fatalf(" mq init err: %v", err)
  }
  ch := NewChannel()
  if err := ch.ExchangeDeclare(exchangeName, "direct"); err != nil {
    log.Fatalf("create exchange err: %v", err)
  }
  if err := ch.QueueDeclare(queueName); err != nil {
    log.Fatalf("create queue err: %v", err)
  }
  if err := ch.QueueBind(queueName, keyName, exchangeName); err != nil {
    log.Fatalf("bind queue err: %v", err)
  }
  go func() {
    if err := NewConsumer(queueName, func(body []byte) error {
      fmt.Println("consume msg :" + string(body))
      return nil
    }); err != nil {
      log.Fatalf("consume err: %v", err)
    }
  }()
  go func() {
    for {
      if err := ch.Publish(exchangeName, keyName, []byte(nowTime())); err != nil {
        log.Fatalf("publish msg err: %v", err)
      }
      time.Sleep(time.Second)
    }
  }()
  time.Sleep(time.Minute)
  t.Log("end")
}
func TestChannel_PublishWithDelay(t *testing.T) {
  var (
    conf = Conf{
      User: "guest",
      Pwd:  "guest",
      Addr: "127.0.0.1",
      Port: "5672",
    }
    exchangeName   = "user.delay.direct"
    queueName      = "user.delay.queue"
    delayQueueName = "user.delay1.queue" // 延迟队列
    keyName        = "user.delay.event"
    delayKeyName   = "user.delay1.event" // 延迟key
  )
  if err := Init(conf); err != nil {
    log.Fatalf(" mq init err: %v", err)
  }
  ch := NewChannel()
  if err := ch.ExchangeDeclare(exchangeName, "direct"); err != nil {
    log.Fatalf("create exchange err: %v", err)
  }
  if err := ch.QueueDeclare(queueName); err != nil {
    log.Fatalf("create queue err: %v", err)
  }
  if err := ch.QueueDeclareWithDelay(delayQueueName, exchangeName, keyName); err != nil {
    log.Fatalf("create queue err: %v", err)
  }
  if err := ch.QueueBind(queueName, keyName, exchangeName); err != nil {
    log.Fatalf("bind queue err: %v", err)
  }
  if err := ch.QueueBind(delayQueueName, delayKeyName, exchangeName); err != nil {
    log.Fatalf("bind queue err: %v", err)
  }
  go func() {
    if err := NewConsumer(queueName, func(body []byte) error {
      fmt.Println(fmt.Sprintf("consumer msg: %s, ts: %s", string(body), nowTime()))
      return nil
    }); err != nil {
      log.Fatalf("consume err: %v", err)
    }
  }()
  go func() {
    for {
      if err := ch.PublishWithDelay(exchangeName, delayKeyName, []byte(nowTime()), 10*time.Second); err != nil {
        log.Fatalf("publish msg err: %v", err)
      }
      time.Sleep(time.Second)
    }
  }()
  time.Sleep(time.Minute)
  t.Log("end")
}

结语


上面的代码均摘自我开发的一个开源项目中, 主要是一个Go的标准项目布局, 封装了一些常用的组件, 有兴趣的朋友可以了解一下, 新手极易上手;

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
2月前
|
开发框架 Go 计算机视觉
纯Go语言开发人脸检测、瞳孔/眼睛定位与面部特征检测插件-助力GoFly快速开发框架
开发纯go插件的原因是因为目前 Go 生态系统中几乎所有现有的人脸检测解决方案都是纯粹绑定到一些 C/C++ 库,如 OpenCV 或 dlib,但通过 cgo 调用 C 程序会引入巨大的延迟,并在性能方面产生显著的权衡。此外,在许多情况下,在各种平台上安装 OpenCV 是很麻烦的。使用纯Go开发的插件不仅在开发时方便,在项目部署和项目维护也能省很多时间精力。
|
3月前
|
Go API 数据库
Go 语言中常用的 ORM 框架,如 GORM、XORM 和 BeeORM,分析了它们的特点、优势及不足,并从功能特性、性能表现、易用性和社区活跃度等方面进行了比较,旨在帮助开发者根据项目需求选择合适的 ORM 框架。
本文介绍了 Go 语言中常用的 ORM 框架,如 GORM、XORM 和 BeeORM,分析了它们的特点、优势及不足,并从功能特性、性能表现、易用性和社区活跃度等方面进行了比较,旨在帮助开发者根据项目需求选择合适的 ORM 框架。
238 4
|
3月前
|
缓存 监控 前端开发
Go 语言中如何集成 WebSocket 与 Socket.IO,实现高效、灵活的实时通信
本文探讨了在 Go 语言中如何集成 WebSocket 与 Socket.IO,实现高效、灵活的实时通信。首先介绍了 WebSocket 和 Socket.IO 的基本概念及其优势,接着详细讲解了 Go 语言中 WebSocket 的实现方法,以及二者集成的重要意义和具体步骤。文章还讨论了集成过程中需要注意的问题,如协议兼容性、消息格式、并发处理等,并提供了实时聊天、数据监控和在线协作工具等应用案例,最后提出了性能优化策略,包括数据压缩、缓存策略和连接管理优化。旨在帮助开发者更好地理解并应用这些技术。
160 3
|
3月前
|
中间件 Go API
Go语言中几种流行的Web框架,如Beego、Gin和Echo,分析了它们的特点、性能及适用场景,并讨论了如何根据项目需求、性能要求、团队经验和社区支持等因素选择最合适的框架
本文概述了Go语言中几种流行的Web框架,如Beego、Gin和Echo,分析了它们的特点、性能及适用场景,并讨论了如何根据项目需求、性能要求、团队经验和社区支持等因素选择最合适的框架。
228 1
|
5月前
|
Kubernetes Go 持续交付
一个基于Go程序的持续集成/持续部署(CI/CD)
本教程通过一个简单的Go程序示例,展示了如何使用GitHub Actions实现从代码提交到Kubernetes部署的CI/CD流程。首先创建并版本控制Go项目,接着编写Dockerfile构建镜像,再配置CI/CD流程自动化构建、推送Docker镜像及部署应用。此流程基于GitHub仓库,适用于快速迭代开发。
117 3
|
5月前
|
Kubernetes 持续交付 Go
创建一个基于Go程序的持续集成/持续部署(CI/CD)流水线
创建一个基于Go程序的持续集成/持续部署(CI/CD)流水线
|
5月前
|
JSON Go API
使用Go语言和Gin框架构建RESTful API:GET与POST请求示例
使用Go语言和Gin框架构建RESTful API:GET与POST请求示例
|
5月前
|
消息中间件 NoSQL Go
PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技术实践
【9月更文挑战第7天】在从 PHP 的 ThinkPHP 框架迁移到 Go 的 Gin 框架时,涉及 Redis 延时消息队列的技术实践主要包括:理解延时消息队列概念,其能在特定时间处理消息,适用于定时任务等场景;在 ThinkPHP 中使用 Redis 实现延时队列;在 Gin 中结合 Go 的 Redis 客户端库实现类似功能;Go 具有更高性能和简洁性,适合处理大量消息。迁移过程中需考虑业务需求及系统稳定性。
107 0
|
6月前
|
SQL JavaScript Go
Go Web 服务框架实现详解
Go Web 服务框架实现详解
|
6月前
|
Cloud Native JavaScript API
一文读懂云原生 go-zero 微服务框架
一文读懂云原生 go-zero 微服务框架

热门文章

最新文章