Which Component spring-cloud-starter-stream-rocketmq组件
Describe what problem you have encountered 现在遇到了一个进退两难的问题。 场景:有个服务msg-hub需要使用spring-cloud-stream连接三个RocketMQ集群,然后随着日积月累,会核每个集群大概400个Topic产生连接,看到关于和RocketMQ的生产者的线程数达到: 2000多个,在容器内出现OOM。目前排查到的原因如下:
- 当使用spring-cloud-stream的StreamBridge通过send方法发送消息给某个RocketMQ集群的时候,StreamBridge会使用: binder(集群名字) + ":" + Topic 作为维度建立一个MessageChannel对象
- MessageChannel对象是spring-cloud-stream对多类型MQ的一个客户端的封装,向下具体化为RocketMQ的就是RocketMQProducerMessageHandler对象
- RocketMQProducerMessageHandler对象包含一个defaultMQProducer对象(DefaultMQProducer类型)
- 每个defaultMQProducer会创建一个新的MQClientInstance对象,原因是:clientId每次都不一样,这个不一样出在: instanceName这块上
- RocketMQProduceFactory在初始化defaultMQProducer的时候,会执行如下的代码设置instanceName这个属性
producer.setInstanceName( RocketMQUtils.getInstanceName(rpcHook, topic + "|" + UtilAll.getPid())); //以Topic为维度设置instanceName
总体一句话:每个Topic对应一个MessageChannel对象,对应一个defaultMQProducer对象,对应一个新的MQClientInstance对象(这个最大的问题是,会产生10个线程,4个是业务发送线程,6个是其他的配套线程)
Describe what information you have read spring-cloud-stream 和spring-cloud-starter-stream-rocketmq创建MessageChannel,以及对应DefaultMQProducer的创建过程
期望: 不知道这里面是否可以通过一些钩子来决定是否每次都需要创建MQClientInstance对象,也就是说让MQClientInstance对象复用?还望指教