go rabbitmq 使用教程 ,go rabbitmq 简单队列,go rabbitmq work模式,go rabbitmq 订阅模式

简介: go rabbitmq 使用教程 ,go rabbitmq 简单队列,go rabbitmq work模式,go rabbitmq 订阅模式

使用Go的过程记录了全部的rabbitmq的go代码,方便自己下次Copy,go的资料比较少,seo估计很好做,流量速度过来。

【一】.简单队列.生产者将消息发送到队列,消费者从队列中获取消息。

1.0.connection code

func NewRabbitMQ() *amqp.Channel {

// 获取connection

amqUrl := "amqp://admin:elecfans@spiderqueue.elecfans.net:5672/"

connection, err := amqp.Dial(amqUrl)

if err != nil {

panic(fmt.Sprintf("获取connection异常:%s\n", err))

}


// 获取channel

channel, err := connection.Channel()

if err != nil {

panic(fmt.Sprintf("获取channel异常:%s\n", err))

}


return channel

}

1.1.client code:

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明队列(不存在自动创建)

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 生产_发送消息到队列

message := "ic元器件活动来新单啦"

err = channel.Publish(

   // 交换机

   "",

   // 队列名称

   queueName,

   // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

   false,

   // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

   false,

   amqp.Publishing{

       ContentType: "text/plain",

       // 队列和消息同时设置持久化

       DeliveryMode: 2,

       Body:         []byte(message),

   },

)

if err != nil {

   fmt.Printf("发送消息到队列异常:%s", err)

   return

}

1.2.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【二】.Work模式.一个生产者,多个消费者,一个消息只能被一个消费者获取到

2.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明队列(不存在自动创建)

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 生产_发送消息到队列

message := "ic元器件活动来新单啦,订单id"

messageSize := 10

for i := 0; i < messageSize; i++ {

   // 方便观察消费者

   time.Sleep(time.Second * 1)

   err = channel.Publish(

       // 交换机

       "",

       // 队列名称

       queueName,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message + strconv.Itoa(i)),

       },

   )

   if err != nil {

       fmt.Printf("发送消息到队列异常:%s", err)

       return

   }

}

2.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "ic_order_active"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【三】.订阅模式(fanout).

一个生产者,多个消费者

每个消费者拥有自己的队列

生产者将消息发送到交换机

每个队列自己去绑定交换机

(交换机没有储存能力,发送到没有任何队列绑定的交换机则消息丢失)

3.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "notice"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "fanout",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

message := "最新消息,华秋全场元器件3折起"

messageSize := 10

for i := 0; i < messageSize; i++ {

   // 方便观察消费者

   time.Sleep(time.Second * 1)

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       "",

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message + strconv.Itoa(i)),

       },

   )

   if err != nil {

       fmt.Printf("发送消息到队列异常:%s", err)

       return

   }

}

【四】.直接匹配(direct)

4.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "pcb_layout_order"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "direct",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

allRouteKey := []string{

   "order_insert", // 新增订单

   "order_delete", // 删除订单

}


// 循环发送到两个路由key

message := "订单id1事件"

for _, routeKey := range allRouteKey {

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       routeKey,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       false,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message),

       },

   )

}

4.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "notice_queue"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 队列绑定交换机+绑定订单新增key

exchangeName := "pcb_layout_order"

allRouteKey := []string{

   "order_insert", // 新增订单

   "order_delete", // 删除订单

}

for _, routeKey := range allRouteKey {

   channel.QueueBind(

       // 队列名称

       queueName,

       // 绑定的键

       routeKey,

       // 交换机名称

       exchangeName,

       // 是否阻塞处理

       false,

       // 其他参数

       nil,

   )

}


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "ic订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}

// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

【五】.直接匹配(topic)

topic同样根据key匹配到队列,#匹配一个或者多个,*匹配一个.(切记:发往topic交换器的routing_key它必须是.分隔的几个词)

5.0.client code

// 生产_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 生产_声明交换机

exchangeName := "smt_steel_order"

err := channel.ExchangeDeclare(

   // 交换机名称

   exchangeName,

   // 交换机类型

   "topic",

   // 持久化

   true,

   // true->当所有绑定都与交换器解绑后,会自动删除此交换器

   false,

   // true->客户端无法直接发送msg到内部交换器,只有交换器可以发送msg到内部交换器

   false,

   // 是否非阻塞

   false,

   // 其他参数

   nil,

)

if err != nil {

   fmt.Printf("声明交换机异常:%s", err)

   return

}


// 生产_发送消息到交换机

allRouteKey := []string{

   "order.insert", // 新增订单

   "order.delete", // 删除订单

}

for _, routeKey := range allRouteKey {

   //fmt.Print(routeKey)

   message := "来自" + routeKey + "的消息"

   err = channel.Publish(

       // 交换机

       exchangeName,

       // 路由key

       routeKey,

       // true->根据自身exchange类型和routeKey规则无法找到符合条件的队列会把消息返还给发送者,false->出现上述情况会直接将消息丢弃

       true,

       // true->当exchange将消息路由到队列后发现队列上没有消费者则会把消息返还给发送者,false->出现上述情况,消息一样会发送给消息队列

       false,

       amqp.Publishing{

           ContentType: "text/plain",

           // 队列和消息同时设置持久化

           DeliveryMode: 2,

           Body:         []byte(message),

       },

   )

}

5.1.service code

// 消费_获取connection的channel

channel := Connecttion.NewRabbitMQ()


// 消费_声明队列

queueName := "notice_queue"

_, err := channel.QueueDeclare(

   // 队列名称

   queueName,

   // 是否持久化

   false,

   // 是否自动删除

   false,

   // 是否具有排他性

   false,

   // 是否阻塞处理

   false,

   // 额外属性

   nil,

)

if err != nil {

   fmt.Printf("声明队列异常:%s", err)

   return

}


// 队列绑定交换机+绑定订单新增key

exchangeName := "smt_steel_order"

routeKey := "order.#"

channel.QueueBind(

   // 队列名称

   queueName,

   // 绑定的路由

   routeKey,

   // 交换机名称

   exchangeName,

   // 是否阻塞处理

   false,

   // 其他参数

   nil,

)


// 设置同一时间服务器只会发送一条消息给消费者

channel.Qos(

   // 每次获取多少条

   10,

   // 预加载数量(rabbitMq不支持)

   0,

   // false->对当前队列可用 true->对channel可用(rabbitMq不支持)

   false,

)


// 消费_获取队列中的消息

message, err := channel.Consume(

   // 队列名称

   queueName,

   // 消费者名称

   "smt订单消费者",

   // 是否自动ack

   false,

   // 是否排他性队列标识

   false,

   false,

   false,

   nil,

)

if err != nil {

   return

}


// 输出消息

for msg := range message {

   // 打印消息内容

   fmt.Printf("收到队列消息%s \n", msg.Body)

   // 确认收到消息

   msg.Ack(true)

}

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
11月前
|
人工智能 安全 算法
Go入门实战:并发模式的使用
本文详细探讨了Go语言的并发模式,包括Goroutine、Channel、Mutex和WaitGroup等核心概念。通过具体代码实例与详细解释,介绍了这些模式的原理及应用。同时分析了未来发展趋势与挑战,如更高效的并发控制、更好的并发安全及性能优化。Go语言凭借其优秀的并发性能,在现代编程中备受青睐。
357 33
|
消息中间件
RabbitMQ的 RPC 消息模式你会了吗?
【9月更文挑战第11天】RabbitMQ 的 RPC(远程过程调用)消息模式允许客户端向服务器发送请求并接收响应。其基本原理包括:1) 客户端发送请求,创建回调队列并设置关联标识符;2) 服务器接收请求并发送响应至回调队列;3) 客户端根据关联标识符接收并匹配响应。实现步骤涵盖客户端和服务器的连接、信道创建及请求处理。注意事项包括关联标识符唯一性、回调队列管理、错误处理及性能考虑。RPC 模式适用于构建可靠的分布式应用程序,但需根据需求调整优化。
513 3
|
缓存 NoSQL Go
通过 SingleFlight 模式学习 Go 并发编程
通过 SingleFlight 模式学习 Go 并发编程
|
消息中间件 网络协议 RocketMQ
RocketMQ Controller 模式 始终更新成本机ip
ontrollerAddr=192.168.24.241:8878 但是日志输出Update controller leader address to 127.0.0.1:8878。导致访问失败
345 4
|
Go 调度 开发者
探索Go语言中的并发模式:goroutine与channel
在本文中,我们将深入探讨Go语言中的核心并发特性——goroutine和channel。不同于传统的并发模型,Go语言的并发机制以其简洁性和高效性著称。本文将通过实际代码示例,展示如何利用goroutine实现轻量级的并发执行,以及如何通过channel安全地在goroutine之间传递数据。摘要部分将概述这些概念,并提示读者本文将提供哪些具体的技术洞见。
|
安全 Go 调度
探索Go语言的并发模式:协程与通道的协同作用
Go语言以其并发能力闻名于世,而协程(goroutine)和通道(channel)是实现并发的两大利器。本文将深入了解Go语言中协程的轻量级特性,探讨如何利用通道进行协程间的安全通信,并通过实际案例演示如何将这两者结合起来,构建高效且可靠的并发系统。
|
安全 Go 开发者
破译Go语言中的并发模式:从入门到精通
在这篇技术性文章中,我们将跳过常规的摘要模式,直接带你进入Go语言的并发世界。你将不会看到枯燥的介绍,而是一段代码的旅程,从Go的并发基础构建块(goroutine和channel)开始,到高级模式的实践应用,我们共同探索如何高效地使用Go来处理并发任务。准备好,让Go带你飞。
|
消息中间件 存储 监控
RabbitMQ 队列之战:Classic 和 Quorum 的性能洞察
RabbitMQ 是一个功能强大的消息代理,用于分布式应用程序间的通信。它通过队列临时存储消息,支持异步通信和解耦。经典队列适合高吞吐量和低延迟场景,而仲裁队列则提供高可用性和容错能力,适用于关键任务系统。选择哪种队列取决于性能、持久性和容错性的需求。
1087 6
|
消息中间件 NoSQL Go
PHP转Go系列 | ThinkPHP与Gin框架之Redis延时消息队列技术实践
【9月更文挑战第7天】在从 PHP 的 ThinkPHP 框架迁移到 Go 的 Gin 框架时,涉及 Redis 延时消息队列的技术实践主要包括:理解延时消息队列概念,其能在特定时间处理消息,适用于定时任务等场景;在 ThinkPHP 中使用 Redis 实现延时队列;在 Gin 中结合 Go 的 Redis 客户端库实现类似功能;Go 具有更高性能和简洁性,适合处理大量消息。迁移过程中需考虑业务需求及系统稳定性。
438 1