前言
在我们日常开发中, 消息队列是必不可少的一环, RabbitMq是一个实现了AMQP高级消息队列协议的消息队列服务, 也是我们比较常用的消息队列, 还可以实现延迟消费, 今天来说说怎么把它集成到我们的开发框架中;
RabbitMq
安装
使用docker安装一条语句简单快捷
docker run -d --name rabbit-mq -p 15672:15672 -p 5672:5672 rabbitmq:3-management
然后可以通过请求[http://127.0.0.1:15672/#/](http://127.0.0.1:15672/#/)来访问管理后台, 默认账号密码都是 guest;
作用
异步, 削峰, 解耦老三套了, 场景实例其他博客都写烂了, 我这里简单列个场景大家都理解就行:
用户注册成功需要发送短信通知用户, 还需要通知钱包服务和埋点服务进行对应的初始化和记录;
用户注册成功短信可以写入消息队列, 然后主流程返回注册成功结果, 短信服务去消费队列发送短信, 这个过程就叫异步;
短信服务支撑平台全部的短信业务, 在大并发的情况下可能会导致短信服务挂掉, 可以将需要发送的短信内容都放入消息队列, 然后短信服务以固定的速率去消费, 这就叫削峰(其实就是限流);
用户注册成功需要通知钱包服务和埋点服务, 后续还有可能会有ABCDE等其他服务, 所以直接将注册成功信息写入消息队列, 谁用谁就去消费就可以, 不用在主流程里面处理所有的耦合逻辑, 这就叫解耦;
交换机
详细信息大家可以参考下阿里云exchang简介
类型 | 解释 |
direct | 根据消息发送的key路由到跟交换机上绑定key完全一致的队列 |
topic | 跟direct类型都是通过key来路由, topic类型支持通配符, 星号(*)代表一个英文单词(例如cn)。井号(#)代表零个、一个或多个英文单词,英文单词间通过英文句号(.)分隔,例如user.register.event |
fanout | 发送到exchange的消息会被路由到所有跟exchange绑定的队列, 不区分key |
headers | 根据消息的headers属性进行路由, 不常用 |
集成实现
使用注意
mq的channel是有数量限制的(默认2047), 超过限制之后无法创建新的channel, 会报错(channel id space exhausted), 所以建议单个服务不要过多创建新的channel;
建议项目中频率不高的生产者共用一个channel, 消费者的话建议一个消费者对应一个channel(1 consumer per channel);
rabbitmq.go
为了保证mq在遭遇异常的情况下可以实现重新连接, 所以这块引用了amqp091-go里面issues的一个提交;
在打开 connect/channel 时会额外开一个go程去监听mq服务关闭通知, 如果遇到异常情况则会每隔三秒会去尝试生成一个可用的 connect/channel 去替换之前的异常connect/channel;
我试了下消费者在正常消费的情况下遇到关闭mq服务, 然后再将mq服务打开, 消费者可以继续正常消费;
package mq import ( "log" "sync/atomic" "time" amqp "github.com/rabbitmq/amqp091-go" ) // 参阅 https://github.com/rabbitmq/amqp091-go/commit/4ce2c8e4e371338add82c3dc2df56f70d0dca601 const delay = 3 // reconnect after delay seconds // Connection amqp.Connection wrapper type Connection struct { *amqp.Connection } // Channel wrap amqp.Connection.Channel, get a auto reconnect channel func (c *Connection) Channel() (*Channel, error) { ch, err := c.Connection.Channel() if err != nil { return nil, err } channel := &Channel{ Channel: ch, } go func() { for { reason, ok := <-channel.Channel.NotifyClose(make(chan *amqp.Error)) // exit this goroutine if closed by developer if !ok || channel.IsClosed() { log.Println("channel closed") _ = channel.Close() // close again, ensure closed flag set when connection closed break } log.Printf("channel closed, reason: %v", reason) // reconnect if not closed by developer for { // wait 1s for connection reconnect time.Sleep(delay * time.Second) ch, err := c.Connection.Channel() if err == nil { log.Println("channel recreate success") channel.Channel = ch break } log.Printf("channel recreate failed, err: %v", err) } } }() return channel, nil } // Dial wrap amqp.Dial, dial and get reconnect connection func Dial(url string) (*Connection, error) { conn, err := amqp.Dial(url) if err != nil { return nil, err } connection := &Connection{ Connection: conn, } go func() { for { reason, ok := <-connection.Connection.NotifyClose(make(chan *amqp.Error)) // exit this goroutine if closed by developer if !ok { log.Println("connection closed") break } log.Printf("connection closed, reason: %v", reason) // reconnect if not closed by developer for { // wait 1s for reconnect time.Sleep(delay * time.Second) conn, err := amqp.Dial(url) if err == nil { connection.Connection = conn log.Println("reconnect success") break } log.Printf("reconnect failed, err: %v", err) } } }() return connection, nil } // Channel amqp.Channel wapper type Channel struct { *amqp.Channel closed int32 } // IsClosed indicate closed by developer func (ch *Channel) IsClosed() bool { return atomic.LoadInt32(&ch.closed) == 1 } // Close ensure closed flag set func (ch *Channel) Close() error { if ch.IsClosed() { return amqp.ErrClosed } atomic.StoreInt32(&ch.closed, 1) return ch.Channel.Close() } // Consume wrap amqp.Channel.Consume, the returned delivery will end only when channel closed by developer func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) { deliveries := make(chan amqp.Delivery) go func() { for { d, err := ch.Channel.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args) if err != nil { log.Printf("consume failed, err: %v", err) time.Sleep(delay * time.Second) continue } for msg := range d { deliveries <- msg } // sleep before IsClose call. closed flag may not set before sleep. time.Sleep(delay * time.Second) if ch.IsClosed() { break } } }() return deliveries, nil }
index.go
初始化rabbitmq
, 并生成一个默认的channel(频率不高的生产者可以共用), 以及封装了一些常用的方法;
package mq import ( "context" "fmt" amqp "github.com/rabbitmq/amqp091-go" "time" ) type ( Conf struct { Addr string Port string User string Pwd string } ) var ( defaultConn *Connection defaultChannel *Channel ) // Init 初始化 func Init(c Conf) (err error) { if c.Addr == "" { return nil } defaultConn, err = Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", c.User, c.Pwd, c.Addr, c.Port)) if err != nil { return fmt.Errorf("new mq conn err: %v", err) } defaultChannel, err = defaultConn.Channel() if err != nil { return fmt.Errorf("new mq channel err: %v", err) } return } // NewChannel 获取channel. func NewChannel() *Channel { return defaultChannel } // ExchangeDeclare 创建交换机. func (ch *Channel) ExchangeDeclare(name string, kind string) (err error) { return ch.Channel.ExchangeDeclare(name, kind, true, false, false, false, nil) } // Publish 发布消息. func (ch *Channel) Publish(exchange, key string, body []byte) (err error) { _, err = ch.Channel.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, false, false, amqp.Publishing{ContentType: "text/plain", Body: body}) return err } // PublishWithDelay 发布延迟消息. func (ch *Channel) PublishWithDelay(exchange, key string, body []byte, timer time.Duration) (err error) { _, err = ch.Channel.PublishWithDeferredConfirmWithContext(context.Background(), exchange, key, false, false, amqp.Publishing{ContentType: "text/plain", Body: body, Expiration: fmt.Sprintf("%d", timer.Milliseconds())}) return err } // QueueDeclare 创建队列. func (ch *Channel) QueueDeclare(name string) (err error) { _, err = ch.Channel.QueueDeclare(name, true, false, false, false, nil) return } // QueueDeclareWithDelay 创建延迟队列. func (ch *Channel) QueueDeclareWithDelay(name, exchange, key string) (err error) { _, err = ch.Channel.QueueDeclare(name, true, false, false, false, amqp.Table{ "x-dead-letter-exchange": exchange, "x-dead-letter-routing-key": key, }) return } // QueueBind 绑定队列. func (ch *Channel) QueueBind(name, key, exchange string) (err error) { return ch.Channel.QueueBind(name, key, exchange, false, nil) } // NewConsumer 实例化一个消费者, 会单独用一个channel. func NewConsumer(queue string, handler func([]byte) error) error { ch, err := defaultConn.Channel() if err != nil { return fmt.Errorf("new mq channel err: %v", err) } deliveries, err := ch.Consume(queue, "", false, false, false, false, nil) if err != nil { return fmt.Errorf("consume err: %v, queue: %s", err, queue) } for msg := range deliveries { err = handler(msg.Body) if err != nil { _ = msg.Reject(true) continue } _ = msg.Ack(false) } return nil }
index_test.go
使用示例, 实现了生产者消费者及延迟队列; 新手可以直接使用快速上手;
package mq import ( "fmt" "log" "testing" "time" ) func nowTime() string { return time.Now().Format("2006-01-02 15:04:05") } func TestChannel_Publish(t *testing.T) { var ( conf = Conf{ User: "guest", Pwd: "guest", Addr: "127.0.0.1", Port: "5672", } exchangeName = "user.register.direct" queueName = "user.register.queue" keyName = "user.register.event" ) if err := Init(conf); err != nil { log.Fatalf(" mq init err: %v", err) } ch := NewChannel() if err := ch.ExchangeDeclare(exchangeName, "direct"); err != nil { log.Fatalf("create exchange err: %v", err) } if err := ch.QueueDeclare(queueName); err != nil { log.Fatalf("create queue err: %v", err) } if err := ch.QueueBind(queueName, keyName, exchangeName); err != nil { log.Fatalf("bind queue err: %v", err) } go func() { if err := NewConsumer(queueName, func(body []byte) error { fmt.Println("consume msg :" + string(body)) return nil }); err != nil { log.Fatalf("consume err: %v", err) } }() go func() { for { if err := ch.Publish(exchangeName, keyName, []byte(nowTime())); err != nil { log.Fatalf("publish msg err: %v", err) } time.Sleep(time.Second) } }() time.Sleep(time.Minute) t.Log("end") } func TestChannel_PublishWithDelay(t *testing.T) { var ( conf = Conf{ User: "guest", Pwd: "guest", Addr: "127.0.0.1", Port: "5672", } exchangeName = "user.delay.direct" queueName = "user.delay.queue" delayQueueName = "user.delay1.queue" // 延迟队列 keyName = "user.delay.event" delayKeyName = "user.delay1.event" // 延迟key ) if err := Init(conf); err != nil { log.Fatalf(" mq init err: %v", err) } ch := NewChannel() if err := ch.ExchangeDeclare(exchangeName, "direct"); err != nil { log.Fatalf("create exchange err: %v", err) } if err := ch.QueueDeclare(queueName); err != nil { log.Fatalf("create queue err: %v", err) } if err := ch.QueueDeclareWithDelay(delayQueueName, exchangeName, keyName); err != nil { log.Fatalf("create queue err: %v", err) } if err := ch.QueueBind(queueName, keyName, exchangeName); err != nil { log.Fatalf("bind queue err: %v", err) } if err := ch.QueueBind(delayQueueName, delayKeyName, exchangeName); err != nil { log.Fatalf("bind queue err: %v", err) } go func() { if err := NewConsumer(queueName, func(body []byte) error { fmt.Println(fmt.Sprintf("consumer msg: %s, ts: %s", string(body), nowTime())) return nil }); err != nil { log.Fatalf("consume err: %v", err) } }() go func() { for { if err := ch.PublishWithDelay(exchangeName, delayKeyName, []byte(nowTime()), 10*time.Second); err != nil { log.Fatalf("publish msg err: %v", err) } time.Sleep(time.Second) } }() time.Sleep(time.Minute) t.Log("end") }
结语
上面的代码均摘自我开发的一个开源项目中, 主要是一个Go的标准项目布局, 封装了一些常用的组件, 有兴趣的朋友可以了解一下, 新手极易上手;