[redisson]redis流消费自动ack问题

2024-07-18 718 views
1

redisson-spring-boot-starter 3.15.1
查看源码,在 RedissonStreamCommands.xReadGroup 中,没有处理readOptions的noack参数

预期行为 redis Stream 手动 ack 实际行为 无论设置成手动还是自动,均自动ack 重现或测试用例的步骤

@Component
public class StreamConsumerRunner implements ApplicationRunner, ApplicationListener<ContextClosedEvent> {

    static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumerRunner.class);

    @Autowired
    RedisConnectionFactory redisConnectionFactory;

    @Autowired
    ThreadPoolTaskScheduler taskScheduler;

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer;

    @Override
    public void run(ApplicationArguments args) throws Exception {

        // 创建配置对象
        StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions = StreamMessageListenerContainerOptions
            .builder()
            // 一次性最多拉取多少条消息
            .batchSize(10)
            // 执行消息轮询的执行器
            .executor(this.taskScheduler)
            // 消息消费异常的handler
            .errorHandler(new ErrorHandler() {
                @Override
                public void handleError(Throwable t) {
                    // throw new RuntimeException(t);
                    t.printStackTrace();
                }
            })
            // 超时时间,设置为0,表示不超时(超时后会抛出异常)
            .pollTimeout(Duration.ZERO)
            // 序列化器
            .serializer(new StringRedisSerializer())
            .build();

        // 根据配置对象创建监听容器对象
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer
            .create(this.redisConnectionFactory, streamMessageListenerContainerOptions);

        // 使用监听容器对象开始监听消费(使用的是手动确认方式)
        streamMessageListenerContainer.receive(Consumer.from("group-1", "consumer-1-1"),
            StreamOffset.create("mystream", ReadOffset.lastConsumed()), message -> {
                LOGGER.info("group-1, {}", message);
                 stringRedisTemplate.opsForStream().acknowledge("group-1", message);
            });

        this.streamMessageListenerContainer = streamMessageListenerContainer;
        // 启动监听
        this.streamMessageListenerContainer.start();
    }

    @Override
    public void onApplicationEvent(ContextClosedEvent event) {
        LOGGER.info("close");
        this.streamMessageListenerContainer.stop();
    }
}

Redis 版本 5.0.10 Redisson 版本 redisson-spring-boot-starter 3.15.1
Redisson 配置

回答

8

已修复!感谢报告

7

@mrniko RedissonStreamCommands.xRead 方法是不是也需要把 noack 参数处理一下?

1

xread redis 命令不支持 ack

6

好,谢谢。

8

嗨好。 有问题问一下。 手动删除掉streamkey(模拟redis key被系统自动逐出)后就报出异常。 手动创建流密钥和组后应用不会自动重新自动开始消费。 怎么处理啊