玩转redis-简单消息队列

简介:

使用go语言基于redis写了一个简单的消息队列
源码地址
使用demo

redis的 list 非常的灵活,可以从左边或者右边添加元素,当然也以从任意一头读取数据

添加数据和获取数据的操作也是非常简单的
LPUSH 从左边插入数据
RPUSH 大右边插入数据
LPOP 从左边取出一个数据
RPOP 从右边取出一个数据

127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"

或者使用 BLPOP BRPOP 来读取数据,不同之处是取数据时,如果没有数据会等待指定的时间,
如果这期间有数据写入,则会读取并返回,没有数据则会返回空
在一个窗口1读取

127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"

在另一个窗口2写入

127.0.0.1:6379> RPUSH list1 a b c
(integer) 3

再开一个窗口3读取,第二次读取时,list是空的,所以等待1秒后返回空。

127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"

127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)

简单消息队列的实现

如果我们只从一边新增元素,向另一边取出元素,这就不是一个消息队列么。但我估计你会有一个疑问,在消费数据时,同一个消息会不会同时被多个consumer消费掉?

当然不会,因为redis是单线程的,在从list取数据时天然不会出现并发问题。但是这是一个简单的消息队列,消费不成功怎么处理还是需要我们自己写代码来实现的

下面我说一下使用list实现一个简单的消息队列的整体思路

comsumer的实现

consumer 主要做的就是从list里读取数据,使用LPOP或者BLPOP都可以,
这里做了一个开关 optionsUseBLopp如果为true时会使用BLPOP

type consumer struct {
    once            sync.Once
    redisCmd        redis.Cmdable
    ctx             context.Context
    topicName       string
    handler         Handler
    rateLimitPeriod time.Duration
    options         ConsumerOptions
    _               struct{}
}

type ConsumerOptions struct {
    RateLimitPeriod time.Duration
    UseBLPop        bool
}

看一下创建consumer的代码,最后面的opts参数是可选的配置

type Consumer = *consumer

func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
    consumer := &consumer{
        redisCmd:  redisCmd,
        ctx:       ctx,
        topicName: topicName,
    }
    for _, o := range opts {
        o(&consumer.options)
    }
    if consumer.options.RateLimitPeriod == 0 {
        consumer.options.RateLimitPeriod = time.Microsecond * 200
    }
    return consumer
}

读取数据后具体怎么进行处理调用者可以根据自己的业务逻辑进行相应处理
有一个小的interface调用者根据自己的逻辑去实现

type Handler interface {
    HandleMessage(msg *Message)
}

读取数据的逻辑使用一个gorouting实现

func (s *consumer) startGetMessage() {
    go func() {
        ticker := time.NewTicker(s.options.RateLimitPeriod)
        defer func() {
            log.Println("stop get message.")
            ticker.Stop()
        }()
        for {
            select {
            case <-s.ctx.Done():
                log.Printf("context Done msg: %#v \n", s.ctx.Err())
                return
            case <-ticker.C:
                var revBody []byte
                var err error
                if !s.options.UseBLPop {
                    revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
                } else {
                    revs := s.redisCmd.BLPop(time.Second, s.topicName)
                    err = revs.Err()
                    revValues := revs.Val()
                    if len(revValues) >= 2 {
                        revBody = []byte(revValues[1])
                    }
                }
                if err == redis.Nil {
                    continue
                }
                if err != nil {
                    log.Printf("LPOP error: %#v \n", err)
                    continue
                }

                if len(revBody) == 0 {
                    continue
                }
                msg := &Message{}
                json.Unmarshal(revBody, msg)
                if s.handler != nil {
                    s.handler.HandleMessage(msg)
                }
            }
        }
    }()
}

Producer 的实现

Producer还是很简单的就是把数据推送到 reids

type Producer struct {
    redisCmd redis.Cmdable
    _        struct{}
}

func NewProducer(cmd redis.Cmdable) *Producer {
    return &Producer{redisCmd: cmd}
}

func (p *Producer) Publish(topicName string, body []byte) error {
    msg := NewMessage("", body)
    sendData, _ := json.Marshal(msg)
    return p.redisCmd.RPush(topicName, string(sendData)).Err()
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
2月前
|
消息中间件 存储 负载均衡
Redis使用ZSET实现消息队列使用总结二
Redis使用ZSET实现消息队列使用总结二
54 0
|
9天前
|
消息中间件 存储 NoSQL
剖析 Redis List 消息队列的三种消费线程模型
Redis 列表(List)是一种简单的字符串列表,它的底层实现是一个双向链表。 生产环境,很多公司都将 Redis 列表应用于轻量级消息队列 。这篇文章,我们聊聊如何使用 List 命令实现消息队列的功能以及剖析消费者线程模型 。
46 20
剖析 Redis List 消息队列的三种消费线程模型
|
3月前
|
消息中间件 负载均衡 NoSQL
Redis系列学习文章分享---第七篇(Redis快速入门之消息队列--List实现消息队列 Pubsub实现消息队列 stream的单消费模式 stream的消费者组模式 基于stream消息队列)
Redis系列学习文章分享---第七篇(Redis快速入门之消息队列--List实现消息队列 Pubsub实现消息队列 stream的单消费模式 stream的消费者组模式 基于stream消息队列)
44 0
|
1月前
|
消息中间件 NoSQL Redis
Redis Stream消息队列之基本语法与使用方式
这篇文章详细介绍了Redis Stream消息队列的基本语法和使用方式,包括消息的添加、读取、删除、修剪以及消费者组的使用和管理,强调了其在消息持久化和主备复制方面的优势。
40 0
|
2月前
|
消息中间件 存储 NoSQL
Redis使用ZSET实现消息队列使用总结一
Redis使用ZSET实现消息队列使用总结一
53 0
|
4月前
|
消息中间件 缓存 NoSQL
Redis stream 用做消息队列完美吗
Redis Stream 是 Redis 5.0 版本中引入的一种新的数据结构,它用于实现简单但功能强大的消息传递模式。 这篇文章,我们聊聊 Redis Stream 基本用法 ,以及如何在 SpringBoot 项目中应用 Redis Stream 。
Redis stream 用做消息队列完美吗
|
4月前
|
消息中间件 监控 NoSQL
使用redis做消息队列
使用redis做消息队列
128 0
|
2月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5天前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
14 0
手撸MQ消息队列——循环数组