[redisson]服务器重启后,delayQueue take数据会阻塞

2024-07-18 50 views
3

服务器重启后,延迟队列正常take数据执行

服务器重启后,延迟队列take数据阻塞,不执行,必须等到下一个内容offer时,队列才会把阻塞的消息全部处理掉

1、向延迟队列offer数据 2、重启服务器,等待步骤1 offer的数据到期 3、等到超过到期时间,再向延迟队列offer数据

Redis version

3.2.100

Redisson version

3.11.5

Redisson configuration

spring: redis: enabled: true database: 0

password:
host: 127.0.0.1
port: 6379
lettuce:
  pool:
    max-active: 100 
    max-idle: 100 
    min-idle: 50 
    max-wait: 6000ms 
timeout: 1000ms

回答

2

我也遇到了这样类似的问题。现象是:第一条数据offer失败之后,会和第二条数据一起offer到队列里。然后监听到两条数据同时处理了。这样就满足不了准时延时的业务需求了。期望可以提高脚本后台线程的实时性。还是说在配置里面可以通过配置来提高实时性。希望大神解答!!!跪拜~~~

8
@Async
@Override
public <T> void pushToDelayQueue(String key, T data) {
    RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingDeque(key);
    RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
    delayedQueue.offer(data, EXPIRE_TIME_MILLIS, TimeUnit.MILLISECONDS);
}

@Async
@Override
public void takeOfDelayQueue(String key) {
    log.info("======延迟队列:[{}]-消费初始化======", key);
    RBlockingQueue<String> blockingFairQueue = redissonClient.getBlockingDeque(key);
    while (true) {
        try {
           String data = blockingFairQueue.take();
           log.info("======延迟队列:[{}]-获取到数据:{}======", key, data);
        } catch (InterruptedException e) {
            log.error("延迟队列处理异常,异常信息:{}", e.getMessage());
        }
    }
}

就是简单的延迟队列添加和消费

8

DelayedQueue initialized during getDelayedQueue method invocation. As workaround, after server restart you need to invoke this method again. You don't need to add any element in this case.

6

谢谢,了解了,不过我个人觉得在getDelayedQueue中初始化会有些问题呢,不知道以后是否会考虑单独给一个初始化方法?

9

Could you clarify on problems you expect?

6

你们是怎么搞的呢,反正我是在中间加了一句

@Async @Override public void takeOfDelayQueue(String key) { log.info("======延迟队列:[{}]-消费初始化======", key); RBlockingQueue blockingFairQueue = redissonClient.getBlockingDeque(key); // 加了这个 redissonClient.getDelayedQueue(blockingFairQueue); while (true) { try { String data = blockingFairQueue.take(); log.info("======延迟队列:[{}]-获取到数据:{}======", key, data); } catch (InterruptedException e) { log.error("延迟队列处理异常,异常信息:{}", e.getMessage()); } } }

8

加了这句感觉还是有问题 生产时不时还是有这种现象

1

你这个代码有问题while(true){ queue.take()} 在网络波动时直接空轮询或者报错死掉,希望改进一下,我现在遇到的问题是,如何在关闭服务的时候优雅的关闭队列,现在是直接中断,关闭服务直接报上万行的中断异常。

5

请教一下大佬,现在找到了更优异的关闭方案了吗?我目前也是在spring的bean的销毁方法里面去出发监听线程的中断,然后监听线程里出现中断异常就退出