什么需要消息队列

系统中引入消息队列机制是对系统一个非常大的改善。例如一个web系统中,用户做了某项操作后需要发送邮件通知到用户邮箱中。你可以使用同步方式让用户等待邮件发送完成后反馈给用户,但是这样可能会因为网络的不确定性造成用户长时间的等待从而影响用户体验。

有些场景下是不可能使用同步方式等待完成的,那些需要后台花费大量时间的操作。例如极端例子,一个在线编译系统任务,后台编译完成需要30分钟。这种场景的设计不可能同步等待后在回馈,必须是先反馈用户随后异步处理完成,再等待处理完成后根据情况再此反馈用户与否。

另外适用消息队列的情况是那些系统处理能力有限的情况下,先使用队列机制把任务暂时存放起来,系统再一个个轮流处理掉排队的任务。这样在系统吞吐量不足的情况下也能稳定的处理掉高并发的任务。

消息队列可以用来做排队机制,只要系统需要用到排队机制的地方就可以使用消息队列来作。


使用redis怎么做消息队列

首先redis它的设计是用来做缓存的,但是由于它自身的某种特性使得他可以用来做消息队列。它有几个阻塞式的API可以使用,正是这些阻塞式的API让他有做消息队列的能力。

redis能做消息队列得益于他list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些接口。他们都是阻塞版的,所以可以用来做消息队列。


Redis实现先进先出队列

Redis实现FIFO很容易,只需要一个List对象从头取数据,从尾部塞数据即可实现。例如lpush存数据,brpop取数据。


Redis实现优先级队列

首先brpop和blpop是支持多list读取的,比如brpop lista listb 0 命令就可以实现先从lista读取数据,读取完lista的数据再去读取listb的数据。

那么我们就可以通过如下方式实现了:

127.0.0.1:6379> lpush a 1
(integer) 1
127.0.0.1:6379> lpush a 2
(integer) 2
127.0.0.1:6379> lpush a 3
(integer) 3
127.0.0.1:6379> lpush b 1
(integer) 1
127.0.0.1:6379> lpush b 2
(integer) 2
127.0.0.1:6379> lpush b 3
(integer) 3
127.0.0.1:6379> brpop a b 0
1) "a"
2) "1"
127.0.0.1:6379> brpop a b 0
1) "a"
2) "2"
127.0.0.1:6379> brpop a b 0
1) "a"
2) "3"
127.0.0.1:6379> brpop a b 0
1) "b"
2) "1"
127.0.0.1:6379> brpop a b 0
1) "b"
2) "2"
127.0.0.1:6379> brpop a b 0
1) "b"
2) "3"
127.0.0.1:6379> brpop a b 0

这种方案我们可以支持不同阶段的优先级队列,例如高中低三个级别或者更多的级别都可以。

 

多优先级问题解决

如果优先级级别很多的情况,假设有个这样的需求,优先级不是简单的高中低或者0-10这些固定的级别。而是类似0-99999这么多级别。那么我们第三种方案将不太合适了。

虽然redis有sorted set这样的可以排序的数据类型,看是很可惜它没有阻塞版的接口。于是我们还是只能使用list类型通过其他方式来完成目的。

 

有个简单的做法我们可以只设置一个队列,并保证它是按照优先级排序号的。然后通过二分查找法查找一个任务合适的位置,并通过 lset 命令插入到相应的位置。 

例如队列里面包含着写优先级的任务[1, 3, 6, 8, 9, 14],当有个优先级为7的任务过来,我们通过自己的二分算法一个个从队列里面取数据出来反和目标数据比对,计算出相应的位置然后插入到指定地点即可。

因为二分查找是比较快的,并且redis本身也都在内存中,理论上速度是可以保证的。但是如果说数据量确实很大的话我们也可以通过一些方式来调优。

把上面的方案结合起来就会很大程度上减少开销。例如数据量十万的队列,它们的优先级也是随机0-十万的区间。我们可以设置 10个或者100个不同的队列,0-一万的优先级任务投放到1号队列,一万-二万的任务投放到2号队列。这样将一个队列按不同等级拆分后它单个队列的数据 就减少许多,这样二分查找匹配的效率也会高一点。但是数据所占的资源基本是不变的,十万数据该占多少内存还是多少。只是系统里面多了一些队列而已。


redis实现定时消息队列

由于Redis排序集合(Sorted Sets)没有实现阻塞功能,所以只能通过程序自己实现。score字段存入时间戳,由于时间戳较长我们用三位数字代替。

127.0.0.1:6379> zadd seta 100 a
(integer) 1
127.0.0.1:6379> zadd seta 200 b
(integer) 1
127.0.0.1:6379> zadd seta 300 c
(integer) 1
127.0.0.1:6379> zadd seta 300 d
(integer) 1

首先我们插入4条数据。

然后我们获取0到当前时间的数据。比如当前时间戳为200,那么我们执行如下命令

127.0.0.1:6379> zrangebyscore seta 0 200 limit 0 1
1) "a"
127.0.0.1:6379> zrem seta a
(integer) 1
127.0.0.1:6379> zrangebyscore seta 0 201 limit 0 1
1) "b"
127.0.0.1:6379> zrem seta b
(integer) 1
127.0.0.1:6379> zrangebyscore seta 0 202 limit 0 1
(empty list or set)

如果取到空数据,阻塞一段时间,然后继续取数据,循环执行即可。

这里我们为什么没有采用zremrangebyscore命令而是采用zrangebyscore和zrem组合,因为zremrangebyscore没有limit参数,可能取到多行数据(例如两个数据socore一样等),由于并发问题可能导致zrem返回0,这样也没事,我们继续取即可。

java代码片段:

public String getData() throws Exception {
    Jedis jedis = getResource();
    while (true) {
        Set<String> seta = jedis.zrangeByScore("seta", 0, System.currentTimeMillis(), 0, 1);
        if (seta != null && seta.size() > 0) {
            String data = seta.toArray(new String[] {})[0];
            Long res = jedis.zrem("seta", data);
            if (res > 0) {
                return data;
            }
        }
        Thread.sleep(1000L);
    }
}


没有登录不能评论