[spring-projects/spring-boot]为 Reactor Kafka 添加自动配置

2024-06-26 558 views
2

一些说明:

  • 此 PR 解决了https://github.com/spring-projects/spring-boot/issues/29080https://github.com/spring-projects/spring-boot/issues/18804
  • 我将 Reactor Kafka 自动配置放在kafka层次结构下,而不是reactor.kafka因为没有其他组件具有其响应式等效文件夹。当实现所谓的“通用 kafka 自动配置超类”时,这也更容易。当需要实施更改时,这也更容易维护,因为两个自动配置都位于同一位置。
  • 常见的 Kafka 自动配置超类尚未在此 PR 中实现,因此具有其自己的ReactiveKafkaProperties静态类。Properties
  • 此 PR 并未解决使用 Reactor Kafka 时对 Spring Kafka 的需求,即此问题。为了解决这个问题,我们需要提取两个客户端使用的所有公共属性并将它们公开给它们。
  • 由于像 这样的基本字段bootstrapServers在 中定义KafkaPropertiesReactiveKafkaProperties因此用 进行注释@ConditionalOnBean(KafkaProperties.class)
  • 自动配置函数使用带有空检查\大于零检查的ReceiverOptions“s & ”设置器,而不仅仅是构建一个类,因为这个类是内部的而不是公共的。SenderOptionsImmutableReceiverOptions
  • 我一直在创建定制器界面,但我不确定我应该定制哪些类,因为DefaultKafkaReceiver&DefaultKafkaSender被归类为内部类。

schedulerassignListenersrevokeListeners字段没有自动配置schedulerSupplier

  • 由于reactor.kafka.sender.ImmutableSenderOptions不是公开的,因此目前很难传递像 Scheduler 这样的字段,因为它是从配置文件中读取的,作为类型的字段Class<?>,而不能轻易传递给senderOptions.scheduler()
  • assignListeners并且revokeListeners似乎没有必要从 YAML 进行配置,即使我们想这样做,我认为也很难实现。
  • schedulerSupplier对于我来说,自动配置似乎也是不必要的。
  • assignTopicPartitions由于将 YAML 文件中的String& KV 对绑定为类时出现问题,因此我无法绑定。intTopicPartition
  • 我曾想过在没有配置任何主题时抛出一个错误,但是我放弃了这个,因为它可能会损害那些在类路径上有自动配置但不使用它的人。

回答

2

@artembilan 这实际上是一个好主意。我自动配置了选项,基本上是因为我们在这个问题上达成了一致。将自动配置迁移到模板中非常容易。您是否建议仍然保留选项的自动配置(以防人们想要自定义内容)?

6

当然!我将把这个决定留给 @garyrussell,因为看起来他建议的只是自动配置选项。但是如果没有我们已经可以用来向 Kafka 发送或接收的东西,这个功能看起来就不完整。我只是想到的是自动配置KafkaTemplate和一些消费者基础设施……

8

我建议只配置选项,因为这是最大的痛点 - Boot 已经有配置属性的机制,与 spring-kafka 的相同;我认为创建发送方和接收方没有太多好处,因为它们不能重复使用,而且用户只需要再写一行;所以它不会节省很多样板。

在多数情况下

KafkaReceiver.create(options)
    .receive()
    . ...
    .subscribe()

这里的想法是获得最小的自动配置,并随着时间的推移让社区请求更多配置。

这些Reactive....Template都是非常轻量级的包装器,目前并没有增加太多价值;我希望在今年晚些时候致力于实现 react-kafka 和 spring-kafka 之间更紧密的集成;然后自动配置可以进一步发展。

3

我们确实需要将 KafkaProperties 中的等拉到Producer它们Consumer自己的类中,并在两个地方使用它们;这些代表原生的 Kafka 属性。

您想在此 PR 中还是在另一个 PR 中执行此操作?我认为最好在另一个 PR 中完成,因为它会更改KafkaProperties与此 PR 无关的其他类。

另外,@artembilan 和 @garyrussell,我已经解决了你们的问题,欢迎你们再次查看 PR。

6

我们确实需要将 KafkaProperties 中的生产者、消费者等拉到他们自己的类中

这难道不是一个重大的改变吗?因此,这样的修复不能应用于当前的 Spring Boot 2.7.x,只能针对下一个主要版本3.0。是的,考虑到更改的数量,在单独的 PR 中执行此操作确实可能更好。

我还遗漏了什么吗,@garyrussell?

8

为什么这会是一个重大改变?

启动团队不认为KafkaProperties这是一个公共 API;然而,我建议将这些类放在它们自己的文件中,以便它们可以用于 spring-kafka 和 react-kafaka 生产者和消费者属性。

3

@garyrussell 唯一要做的改变就是将KafkaProperties.ConsumerKafkaProperties.Producer移到单独的类中并进行调整,对吗?

我想知道这是否也意味着移动消费者和生产者正在使用的。KafkaProperties.Ssl&属性似乎对于反应式和非反应式来说也是通用的。 我提到的所有内容是否都被移动到单独的公共类中?KafkaProperties.SecurityKafkaProperties.JaasbootstrapServers

2

是的,是的。

但是,我对 Boot 的属性文档了解不够,不知道这样的改变是否会破坏一些东西,所以我们需要 Boot 团队的意见。

7

@garyrussell 我认为这只会破坏 的本机使用KafkaProperties,而这实际上不应该发生。无论如何,您是否希望在另一个 PR 上执行此操作?

8

就像我说的

但是,我对 Boot 的属性文档了解不够,不知道这样的改变是否会破坏一些东西,所以我们需要 Boot 团队的意见。

无论如何,这已经错过了 2.7.x 列车,所以现在有足够的时间来解决这个问题。

0

@garyrussell 好的。我将在接下来的几天内将类别拆分的提交添加到此 PR

7

感谢@almogtavor的PR。以下是一些一般性评论:

  • 我对一些现有属性与本文提出的属性之间明显的重叠或至少相似性感到有些困惑。例如spring.kafka.consumer.auto-commit-intervalspring.reactor.kafka.receiver.commit-intervalspring.kafka.properties.*spring.reactor.kafka.properties.*
  • 鉴于没有spring.reactor.kafka.consumer.properties.*,似乎存在着spring.reactor.kafka.properties.*不平衡spring.reactor.kafka.sender.properties.*
  • 考虑到重用spring.kafka.producer.*spring.kafka.consumer.*属性来配置 Reactor Kafka,是否spring.kafka.reactor有一个更好的前缀,因为它可以使属性名称更紧密地结合在一起?

我认为,退后一步并考虑一下我们到底想要如何配置接收方和发送方选项,会大有裨益。一旦确定了这一点,我们就可以将其映射到属性类和自动配置中。

3

是的,是的。

但是,我对 Boot 的属性文档了解不够,不知道这样的改变是否会破坏一些东西,所以我们需要 Boot 团队的意见。

如果我们决定“共享”消费者/生产者属性,则它们将被移出到单独的类中,然后每个实例变量都Consumer/Producer将用@NestedConfigurationProperty标记,注释处理器应该可以很好地处理它。一个很好的例子是ServerProperties.java

5

感谢@almogtavor的PR。以下是一些一般性评论:

  • 我对一些现有属性与本文提出的属性之间明显的重叠或至少相似性感到有些困惑。例如spring.kafka.consumer.auto-commit-intervalspring.reactor.kafka.receiver.commit-intervalspring.kafka.properties.*spring.reactor.kafka.properties.*
  • 鉴于没有spring.reactor.kafka.consumer.properties.*,似乎存在着spring.reactor.kafka.properties.*不平衡spring.reactor.kafka.sender.properties.*
  • 考虑到重用spring.kafka.producer.*spring.kafka.consumer.*属性来配置 Reactor Kafka,是否spring.kafka.reactor有一个更好的前缀,因为它可以使属性名称更紧密地结合在一起?

我认为,退后一步并考虑一下我们到底想要如何配置接收方和发送方选项,会大有裨益。一旦确定了这一点,我们就可以将其映射到属性类和自动配置中。

@wilkinsona 我将回复我所看到的有关属性重叠的内容,同时退一步思考我们如何配置这个东西......

ℹ️我主要谈论接收者,但发送者也遵循相同的思路...

因此,模式是“ReceiverOptions.properties”映射中的所有属性以及键和值序列化器属性都传递到 KafkaConsumer 中。

public <K, V> Consumer<K, V> createConsumer(ReceiverOptions<K, V> config) {
    return new KafkaConsumer<>(config.consumerProperties(),
                               config.keyDeserializer(),
                               config.valueDeserializer());
}

这些是唯一“真正”重叠的。所有其他选项都在反应控制层之外使用。您可以在ConsumerEventLoopKafkaConsumer中看到用法。

我猜他们选择不保持键/值消费者属性映射条目和第一类键/值 serde 属性同步,因为它们都传递到了 KafkaConsumer 构造函数中。我们可以做同样的事情,或者根据需要同步它们。

我记得我ReactorKafka以前使用时,对于要配置哪些“杠杆”(原生 Kafka 消费者杠杆或 ReactorKafka 杠杆)来获得所需行为感到困惑。ReactorKafka 的参考文档确实谈到了这一点 IIRC。我认为我们应该置身事外,只需让第一类选项属性存在于它们存在的地方,而不是尝试合并/协调,例如使用 spring.kafka.consumer.auto-commit-interval and spring.reactor.kafka.receiver.commit-interval

可能的方向

总结一下我所听到的内容以及我认为的好的方向:

  • 将 RKP 前缀为“spring.kafka.reactor”
  • 不再利用 RKP 中的 KP 类
  • NestedConfigurationProperty在 KP 和 RKP 中将消费者/生产者属性拆分成自己的类别(杠杆)
  • 在RKP中添加buildConsumer / ProducerProeprties以获取常见的道具,例如bootstrap等。
  • 像在接收方/发送方选项中那样直接镜像消费者/生产者的属性(也就是说,不要尝试协调类似的属性)

拆分方法的缺点是,如果有人同时使用普通 Kafka 和反应式 Kafka,则需要多次映射消费者属性。不过,我认为它们之间的“杠杆”会略有不同。

示例 yaml 如下所示:

spring:
  kafka:
    bootstrap-servers: localhost:9092   
    consumer:
      auto-offset-reset: earliest
      auto-commit-interval: 5s
    reactive:
      consumer:
        auto-offset-reset: latest
        auto-commit-interval: 3s

有什么想法吗?

2

感谢您分享您的想法,@onobc。

而不是试图合并/调和例如 spring.kafka.consumer.auto-commit-intervalspring.reactor.kafka.receiver.commit-interval

我对此不太满意。一旦 Boot 显示了这些属性,IDE 就会为它们提供自动完成功能,这会增加两个名称相似的属性给用户造成混淆并因此难以使用的可能性。如果这两个属性具有相同的用途,我认为应该将它们合并为一个属性。如果这两个属性具有不同的用途,我认为应该重命名它们。无论哪种情况,我都认为有必要对 Spring Kafka 和/或 Reactor Kafka 进行一些更改,以便将属性映射到类似的设置上。

3

如果两处房产的用途相同,我认为应该合并为一处房产。如果两处房产的用途不同,我认为应该重新命名

@wilkinsona 很有道理。因此,需要更深入地了解这些“重叠”属性,才能理解每个属性该做什么(合并、重命名、保留等)。我今天会尝试扫描并获取这些属性的列表,以便我们开始讨论。

除了几个值得怀疑的属性之外,其他几点:

  • 将 RKP 前缀为“spring.kafka.reactor”
  • 不再利用 RKP 中的 KP 类
  • NestedConfigurationProperty在 KP 和 RKP 中将消费者/生产者属性拆分成自己的类别(杠杆)
  • 在RKP中添加buildConsumer / ProducerProeprties以获取常见的道具,例如bootstrap等。
  • ~像在接收方/发送方选项中那样直接镜像消费者/生产者的属性(也就是说,不要尝试协调相似的属性)~
1

我认为spring.kafka.reactor作为属性前缀是有意义的。至于其他的,我认为现在说还为时过早。我认为我们需要知道我们希望自动配置提供什么,然后再花更多时间考虑精确的细节。提供的内容应该定义为将自动配置的 bean 以及可用于控制其配置的属性。一旦我们知道了这一点,我们就可以弄清楚如何实现它。

请让我们继续讨论 #29080,而不是在这里。一旦得出结论,我们就可以决定这个 PR 是否大致适用,或者是否需要采用不同的方法。

8

仅对于另一个数据点 - 请参阅https://github.com/spring-projects/spring-boot/issues/17420和关于链接 PR 的讨论。

Kafka 属性的数量非常多;当我们第一次添加自动配置时,我们选择了一个属性子集作为第一类,如下所述:https://docs.spring.io/spring-boot/docs/current/reference/html/messaging.html#messaging.kafka.additional-properties

具体来说,

Spring Boot 自动配置支持所有高重要性属性、一些选定的中和低重要性属性以及任何没有默认值的属性。

根据用户要求,我们随着时间的推移添加了其他内容(例如,该链接 PR 上的隔离级别)。

来自 kafka-clients 类的某种属性的代码生成*Config是理想的。