[alibaba/spring-cloud-alibaba]spring-cloud-starter-stream-rocketmq当多集群,然后大量Topic的问题

2024-10-14 833 views
3

Which Component spring-cloud-starter-stream-rocketmq组件

Describe what problem you have encountered 现在遇到了一个进退两难的问题。 场景:有个服务msg-hub需要使用spring-cloud-stream连接三个RocketMQ集群,然后随着日积月累,会核每个集群大概400个Topic产生连接,看到关于和RocketMQ的生产者的线程数达到: 2000多个,在容器内出现OOM。目前排查到的原因如下:

  1. 当使用spring-cloud-stream的StreamBridge通过send方法发送消息给某个RocketMQ集群的时候,StreamBridge会使用: binder(集群名字) + ":" + Topic 作为维度建立一个MessageChannel对象
  2. MessageChannel对象是spring-cloud-stream对多类型MQ的一个客户端的封装,向下具体化为RocketMQ的就是RocketMQProducerMessageHandler对象
  3. RocketMQProducerMessageHandler对象包含一个defaultMQProducer对象(DefaultMQProducer类型)
  4. 每个defaultMQProducer会创建一个新的MQClientInstance对象,原因是:clientId每次都不一样,这个不一样出在: instanceName这块上
  5. RocketMQProduceFactory在初始化defaultMQProducer的时候,会执行如下的代码设置instanceName这个属性
    producer.setInstanceName(
                RocketMQUtils.getInstanceName(rpcHook, topic + "|" + UtilAll.getPid()));  //以Topic为维度设置instanceName

总体一句话:每个Topic对应一个MessageChannel对象,对应一个defaultMQProducer对象,对应一个新的MQClientInstance对象(这个最大的问题是,会产生10个线程,4个是业务发送线程,6个是其他的配套线程)

Describe what information you have read spring-cloud-stream 和spring-cloud-starter-stream-rocketmq创建MessageChannel,以及对应DefaultMQProducer的创建过程

期望: 不知道这里面是否可以通过一些钩子来决定是否每次都需要创建MQClientInstance对象,也就是说让MQClientInstance对象复用?还望指教

回答

3

看了下spring-cloud-stream的设计思想是以Topic为维度,但是在现行版本当中(包括最新版本的v4.1.0)还是有个bug。就是channelCahce超过dynamic-destination-cache-size的时候删除的问题。就是它放到channelCache和bindingService的key不一样,一个是以binder(MQ集群的名字) + ":" + Topic作为key,另一个以Topic作为key,导致了,一个能清理,另外一个真正能关闭的无法执行。

就算是可以执行关闭,但是每次都新建MQClientIntance的动作很耗时,无法做到复用,会影响性能

1

@fangjian0423 请教下,这个问题怎么解决。现在有个服务的线程已经超过了1700个线程了

2

@chickenlj 有空帮忙看下这个问题,多谢

4

alibaba的应用全都有这个问题,乱开线程,丝毫不考虑cs切换成本。 nacos随便起一个实例,就开了1000多线程,也不知道他要干啥。

6

目前找到了一个方案,就是使用bytebuddy这个依赖jar,拦截ProduerFacotry构造Instance对象的方法,修改传入的topic信息,可以根据自己的需要进行修改。也是不得已而为之的下策!

1

找到一种方案来处理,使用类覆盖的形式来处理

7

看了下,在新版本已经解决这个问题了:

<spring-cloud-alibaba.version>2021.0.6.0</spring-cloud-alibaba.version>
2

但是在升级的时候,要注意,我们碰到一个spring-cloud-stream的问题,就是:处理byte的时候有点问题