1
redisson-spring-boot-starter 3.15.1
查看源码,在 RedissonStreamCommands.xReadGroup 中,没有处理readOptions的noack参数
预期行为 redis Stream 手动 ack 实际行为 无论设置成手动还是自动,均自动ack 重现或测试用例的步骤
@Component
public class StreamConsumerRunner implements ApplicationRunner, ApplicationListener<ContextClosedEvent> {
static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumerRunner.class);
@Autowired
RedisConnectionFactory redisConnectionFactory;
@Autowired
ThreadPoolTaskScheduler taskScheduler;
@Autowired
StringRedisTemplate stringRedisTemplate;
private StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer;
@Override
public void run(ApplicationArguments args) throws Exception {
// 创建配置对象
StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> streamMessageListenerContainerOptions = StreamMessageListenerContainerOptions
.builder()
// 一次性最多拉取多少条消息
.batchSize(10)
// 执行消息轮询的执行器
.executor(this.taskScheduler)
// 消息消费异常的handler
.errorHandler(new ErrorHandler() {
@Override
public void handleError(Throwable t) {
// throw new RuntimeException(t);
t.printStackTrace();
}
})
// 超时时间,设置为0,表示不超时(超时后会抛出异常)
.pollTimeout(Duration.ZERO)
// 序列化器
.serializer(new StringRedisSerializer())
.build();
// 根据配置对象创建监听容器对象
StreamMessageListenerContainer<String, MapRecord<String, String, String>> streamMessageListenerContainer = StreamMessageListenerContainer
.create(this.redisConnectionFactory, streamMessageListenerContainerOptions);
// 使用监听容器对象开始监听消费(使用的是手动确认方式)
streamMessageListenerContainer.receive(Consumer.from("group-1", "consumer-1-1"),
StreamOffset.create("mystream", ReadOffset.lastConsumed()), message -> {
LOGGER.info("group-1, {}", message);
stringRedisTemplate.opsForStream().acknowledge("group-1", message);
});
this.streamMessageListenerContainer = streamMessageListenerContainer;
// 启动监听
this.streamMessageListenerContainer.start();
}
@Override
public void onApplicationEvent(ContextClosedEvent event) {
LOGGER.info("close");
this.streamMessageListenerContainer.stop();
}
}
Redis 版本
5.0.10
Redisson 版本
redisson-spring-boot-starter 3.15.1
Redisson 配置