有个需求,需要对系统的用户进行全局推送,包含定时推送。 因为怕重复推送了所以打算是待推送的任务发在 redis list 里面。定时任务去队列取未推送的任务。可是每次取多少,怎么取。 还是我要用发布订阅。
1
coffeSlider 2019-05-09 16:10:31 +08:00
取多少,怎么取不看你自己的业务吗?如果 job 是单线程,取数据就直接用 rpoplpush。
|
2
rpdict 2019-05-09 16:44:33 +08:00
用的 redis 的 zset,按时间排序的有序集合,取的时候就取 0~当前时间戳对应的值就行了
|
3
yxjn 2019-05-09 16:52:25 +08:00
redis 队列的缺点是需要自己写很多的异常补偿机制。毕竟 redis 本身不是作为一个完整的队列功能而存在的。
|
5
iyaozhen 2019-05-09 16:56:10 +08:00 via Android
redis 的 pub/sub 严格说不是个队列,是广播,无法并发消费
用 list 就行,左进右出,一条条处理呗,可以多进程。还可以再建个重发队列,失败的丢进去。 |
6
reus 2019-05-09 17:04:40 +08:00
如果 redis 崩了呢?你怎么知道那些发过哪些没发过?
|
12
rpdict 2019-05-09 17:54:58 +08:00
@uoddsa 是一个取舍,就好像数据库加了索引插入数据就会慢一些,但是读取会快很多,看需求是要更快的插入速度还是更准确的发送时间吧,有序集合的好处就是不用遍历所有,无序的好处就是插入快
|
15
strive 2019-05-09 18:39:13 +08:00
可以用个存储过程把要推送的用户和消息放到任务表里面,再把任务表里面数据放到 redis 的 list 里面拿出来处理就可以了
|
16
brickyang 2019-05-09 18:44:47 +08:00 via iPhone
|
17
lovedebug 2019-05-09 18:58:27 +08:00 via Android
这种需求用 azure service bus 更好吧。redis 是有丢消息风险的。用 kafka 都好很多。
|
18
ericliu001 2019-05-09 19:08:25 +08:00
lpoprpush 看下这个命令
|
19
runnerlee 2019-05-09 19:35:27 +08:00 2
laravel 的做法是同时维护三个队列: 主队列 (list), 备份队列 (reserved, zset) , 延时队列 (delayed, zset).
消息从 list 里 lpop 出来之后会根据超时时间再次存放到备份队列里去, 这个操作用 lua 实现: https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L54 ``` -- Pop the first job off of the queue... local job = redis.call('lpop', KEYS[1]) local reserved = false if(job ~= false) then -- Increment the attempt count and place job on the reserved queue... reserved = cjson.decode(job) reserved['attempts'] = reserved['attempts'] + 1 reserved = cjson.encode(reserved) redis.call('zadd', KEYS[2], ARGV[1], reserved) redis.call('lpop', KEYS[3]) end return {job, reserved} ``` 而在从主队列 pop 之前, 会根据当前时间从备份队列和延时队列两个 zset 中取出消息 rpush 到主队列中. https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/RedisQueue.php#L167 同样也是使用 lua 进行操作 https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L105 ``` -- Get all of the jobs with an expired "score"... local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1]) -- If we have values in the array, we will remove them from the first queue -- and add them onto the destination queue in chunks of 100, which moves -- all of the appropriate jobs onto the destination queue very safely. if(next(val) ~= nil) then redis.call('zremrangebyrank', KEYS[1], 0, #val - 1) for i = 1, #val, 100 do redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val))) -- Push a notification for every job that was migrated... for j = i, math.min(i+99, #val) do redis.call('rpush', KEYS[3], 1) end end end return val ``` 同时为了避免重复消费, 在消息消费成功后, 会手动从备份队列删除备份消息. https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Jobs/RedisJob.php#L84 在每次 pop 出消息并进行消费之前, 会注册一个 timeoutHandler, 通过计时器来实现中断超时任务 https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/Worker.php#L111 所以, 当消费过程中发生异常退出或是超时中断后, 会根据重试时间, 从备份队列里面取出备份消息重新消费. |
21
runnerlee 2019-05-09 19:54:43 +08:00 1
忘了补充一个细节,
在用 lua 调用 lpop 之后, 会将消息 json decode 出来然后自增 attempts 字段, 再放到备份队列. https://github.com/laravel/framework/blob/5.8/src/Illuminate/Queue/LuaScripts.php#L62 这样是为了实现最大重试次数, 当失败到配置的最大次数之后, 会把消息保存到 mysql 后从 redis 里丢弃掉. |
22
fishioon 2019-05-09 20:12:32 +08:00
可以考虑下 redis 5.0 中的 stream 结构
|
23
ihipop 2019-05-09 23:56:26 +08:00 via Android
NSQ
|
24
scnace 2019-05-10 00:25:47 +08:00 via Android
di.....disque ?
|
25
zk123 2019-05-10 07:59:15 +08:00 via iPhone
发布订阅模式的数据可靠性不保证,它的数据不保存到快照或者 aof 中,发布即焚,也没有 ack 机制,不适用这样消息队列场景。
List 队列模式比较适合,不过自己要补偿做许多 ack 以及失败机制。建议还是考虑 MQ 或者其他 push。 |
26
zchlwj 2019-05-10 08:54:43 +08:00
redis 消息队列不存盘的哦,这种场景还是考虑 mq 把
|
27
polebug 2019-05-10 09:03:58 +08:00 via Android
我上次写业务 也是用的 zset 慢点无所谓 反正并发推送
|
28
fuxinya 2019-05-10 10:13:41 +08:00 via Android
如果是 spring 项目可以去看看 redisson
|
29
ducklyl 2019-05-10 10:52:47 +08:00
别用 redis 做队列,用 mq 或 rq
|