[redisson]RBlockingQueue延时队列到期不能poll到值。是不是我的使用方式有问题。

2024-07-18 216 views
8

用offer的形式放入延时队列,再用poll的形式取值。现在出现的现象是,有时候到过期时间,但是没有取到值,需要再次往里offer的时候才会取到值值。 //存的代码 RBlockingQueue blockingFairQueue = redisson.getBlockingQueue(task.getQueueName()); RDelayedQueue delayedQueue = redisson.getDelayedQueue(blockingFairQueue); delayedQueue.offer(task, task.getDelayTime(), task.getTimeUnit() == null ? TimeUnit.MINUTES : task.getTimeUnit()); delayedQueue.destroy();

//取的代码: while (true){ RBlockingQueue blockingFairQueue = redisUtils.getRedisson().getBlockingQueue(queueName); task = blockingFairQueue.poll(1,TimeUnit.MINUTES); }

//配置的集群模式 config.useClusterServers().setScanInterval(2000).addNodeAddress(newNodes.toArray(new String[0])) .setSubscriptionConnectionMinimumIdleSize(subscriptionConnectionMinimumIdleSize) .setSubscriptionConnectionPoolSize(subscriptionConnectionPoolSize) .setSubscriptionsPerConnection(subscriptionsPerConnection) .setClientName(clientName) .setRetryAttempts(retryAttempts) .setRetryInterval(retryInterval) .setTimeout(timeout) .setConnectTimeout(connectTimeout) .setIdleConnectionTimeout(idleConnectionTimeout) .setPassword(password) .setMasterConnectionPoolSize(masterConnectionPoolSize) .setMasterConnectionMinimumIdleSize(masterConnectionMinimumIdleSize) .setSlaveConnectionPoolSize(slaveConnectionPoolSize) .setSlaveConnectionMinimumIdleSize(slaveConnectionMinimumIdleSize);

//单点模式配置 config.useSingleServer().setAddress(address) .setConnectionMinimumIdleSize(connectionMinimumIdleSize) .setConnectionPoolSize(connectionPoolSize) .setDatabase(database) .setDnsMonitoringInterval(dnsMonitoringInterval) .setSubscriptionConnectionMinimumIdleSize(subscriptionConnectionMinimumIdleSize) .setSubscriptionConnectionPoolSize(subscriptionConnectionPoolSize) .setSubscriptionsPerConnection(subscriptionsPerConnection) .setClientName(clientName) .setRetryAttempts(retryAttempts) .setRetryInterval(retryInterval) .setTimeout(timeout) .setConnectTimeout(connectTimeout) .setIdleConnectionTimeout(idleConnectionTimeout) .setPassword(password);

redisson版本:3.11.4

回答

3

如何避免此类问题的发生呢?降低延迟时间吗?业务延时时间为30到90分钟不等的。现在数据的使用量不是很大。经常会发生这样的问题。

5

@LionsArmstrong 我也遇到了相同的问题,运行中会有丢消息的情况,现在解决了吗?能分享一下吗

0

遇到同样的问题在 2.15.2 版本中,我这里和 Springmvc 整合时遇到的问题,Junit 单元测试的时候就没有问题,在使用 Springmvc 中使用 @PostConstruct 注解为消费者开启一个 Thread 线程时,生产者发送单个消息后,消费者有时候能消费到,有时候消费不到,但是当生产者连续发送多个消息,消费者就一定能够消费到。

3

使用这样的方式添加元素试试看,我这样就正常了 queue.offer(task, 5, TimeUnit.SECONDS); Thread.sleep(1); queue.destroy();

7

@zcs100 @1186792881 @ManaStuDent 可以尝试一下,以下方式。基本可以解决延时获取不到数据的问题。 RBlockingQueue blockingFairQueue = redisUtils.getRedisson().getBlockingQueue(queueName); redisUtils.getRedisson().getDelayedQueue(blockingFairQueue); task = blockingFairQueue.poll(1,TimeUnit.MINUTES);

8

@LionsArmstrong 老哥, 是说中间要加一个getDelayedQueue()的无用代码才可以避免? 这是什么原理啊 ?

3

我现在是这么解决的?

    @PostConstruct
    public void init() {
        // 1秒钟刷新延迟队列一次
        final Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                refresh();
            }
        }, 1000, 1000);
    }

    /**
     * 重启刷新延迟队列
     */
    private void refresh() {
        RDelayedQueue<String> imageSuitRetryQueue = redissonClient.getDelayedQueue(redissonClient.getBlockingQueue("key"));
        imageSuitRetryQueue.destroy();
    }
7

出现这个问题现象的原理是什么?是发送端延迟没有发送,还是接受方的问题?

2

@astorage

redis里面有数据,接收方监听断了。但是还没有抛出任何异常。就是一直都在监听,但是接收不到数据。等发送端再发送新的数据的时候,接受方就能接受之前的数据。但是时间已经超时了。

5

delayedQueue.destroy(); 注释掉这句,应该就没问题了