[apache/dubbo]升级2.7.12 provider下线造成consumer线程池Terminated

2024-07-02 721 views
1
Environment
  • Dubbo version: 2.7.12
  • Operating System version: centos
  • Java version: 8
Steps to reproduce this issue
  1. API网关(通过泛化调用http转dubbo),dubbo版本从2.7.7升级到2.7.12 处理issue #7109 ,#6959

  2. 配置了参数dubbo.consumer.threads=200

  3. dubbo provider 发布4节点,运行OK,但是删除旧的4个节点的时候,发现部分节点API网关请求provider报错,完整报文见下文

    An exception '{}' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
    org.apache.dubbo.remoting.ExecutionException: class org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler error when process received event .
  4. 最后只能回滚到了2.7.7版本 (该问题在并发大的情况下可复现)

Pls. provide [GitHub address] to reproduce this issue.

Actual Result

What actually happens?

报错代码AllChannelHandler.received

ExecutorService executor = getPreferredExecutorService(message);
//这边executor获取到的对象和 DefaultExecutorRepository.getExecutor获取到的线程池相同

在启动新节点,下线旧节点时候,API网关调用dubbo服务报如下错误,显示,使用到的线程池是一个Terminated状态

我正在跟踪代码,同时想咨询下,在什么场景可能会导致这边线程池变成Terminated状态

An exception '{}' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
org.apache.dubbo.remoting.ExecutionException: class org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler error when process received event .
    at org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.received(AllChannelHandler.java:68)
    at org.apache.dubbo.remoting.exchange.support.header.HeartbeatHandler.received(HeartbeatHandler.java:90)
    at org.apache.dubbo.remoting.transport.MultiMessageHandler.received(MultiMessageHandler.java:52)
    at org.apache.dubbo.remoting.transport.AbstractPeer.received(AbstractPeer.java:147)
    at org.apache.dubbo.remoting.transport.netty4.NettyClientHandler.channelRead(NettyClientHandler.java:83)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:648)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:583)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:500)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.RejectedExecutionException: Task org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable@7d7d66e4 rejected from java.util.concurrent.ThreadPoolExecutor@15527cbc[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 93]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler.received(AllChannelHandler.java:62)
    ... 28 common frames omitted

回答

7

https://github.com/apache/dubbo/blob/e8eeddbf42952e7dd4ba4428548f91006900ea5c/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java#L107

ExecutorService executor = getCallbackExecutor(getUrl(), inv); // 这里consumer 获得的是共享线程池
CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);  // 线程池传递给 DefaultFuture

https://github.com/apache/dubbo/blob/e8eeddbf42952e7dd4ba4428548f91006900ea5c/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java#L147

ExecutorService futureExecutor = future.getExecutor();
 if (futureExecutor != null && !futureExecutor.isTerminated()) {
      futureExecutor.shutdownNow(); // 但channel关闭时,也会将线程池关掉,后面的请求就无法处理了
}

线程栈:

ts=2021-06-30 10:15:23;thread_name=DubboClientHandler-thread-85;id=180;is_daemon=true;priority=5;TCCL=org.springframework.boot.loader.LaunchedURLClassLoader@7487b142
    @java.util.concurrent.ThreadPoolExecutor.shutdownNow()
        at org.apache.dubbo.remoting.exchange.support.DefaultFuture.closeChannel(DefaultFuture.java:147)
        at org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeHandler.disconnected(HeaderExchangeHandler.java:134)
        at org.apache.dubbo.remoting.transport.AbstractChannelHandlerDelegate.disconnected(AbstractChannelHandlerDelegate.java:48)
        at org.apache.dubbo.remoting.transport.dispatcher.ChannelEventRunnable.run(ChannelEventRunnable.java:73)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at org.apache.dubbo.common.threadlocal.InternalRunnable.run(InternalRunnable.java:41)
        at java.lang.Thread.run(Thread.java:745)

ref: #7109

6

消费者端Executor共享之后,DefaultFuture.closeChannel()这里的确不应该再有ExecutorService.shutdownNowI()了,会把不该关闭的ExecutorService关掉。

我尝试模拟了泛化调用的场景,包括泛化+同步、泛化+异步,但始终无法进入futureExecutor.shutdownNow();,因为在正常下线时,进入DefaultFuture.closeChannel()之前,CHANNELS内已经逐步remove了Channel,导致不会进入for循环分支。

是否有简化的场景可以快速进入【futureExecutor.shutdownNow();】,或者把【futureExecutor.shutdownNow();】去掉验证。

2

构造了一种场景复现 1,启动3个提供者 2,启动1个消费者,开始服务调用(服务async=true) 3,kill -9 提供者1,会发现消费者全局ExecutorService被CLOSE,后续服务调用使用SHARED_EXECUTOR 4,kill -9 提供者2,会发现SHARED_EXECUTOR被CLOSE 5,后续服务调用,报错及堆栈基本同issue描述了

kill -9的话,因为与提供者之间的netty断连,触发AbstractChannelHandlerDelegate.disconnected,从而进一步导致futureExecutor.shutdownNow()。如果是服务提供者正常下线(shutdownhook会先解除服务注册,再退出进程)则似乎不会有这个现象。

3

我们的场景是 kubectl delete deployment 的情况下发生的

7

补充触发的条件:服务是异步的,即async=true 原因: org.apache.dubbo.rpc.protocol.AbstractInvoker#getCallbackExecutor中,如果服务是异步,这直接返回Executor,而同步模式返回的是包装后的ThreadlessExecutor。 ThreadlessExecutor.shutdownNow()并不会关闭其包装的Executor,所以同步模式不会有问题。

9

临时通过重写ExecutorRepository重新创建线程池兼容了下


import lombok.extern.slf4j.Slf4j;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.threadpool.ThreadPool;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.threadpool.manager.Ring;
import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NamedThreadFactory;

import java.util.Map;
import java.util.concurrent.*;

import static org.apache.dubbo.common.constants.CommonConstants.*;

/**
 * 升级dubbo版本到2.7.12 处理如下bug
 * https://github.com/apache/dubbo/pull/7109
 * https://github.com/apache/dubbo/pull/6959
 * <p>
 * 但是造成服务蓝绿发布下线服务时候,dubbo全局线程池 被关闭,导致节点请求失败 问题
 * https://github.com/apache/dubbo/issues/8172
 *
 * 临时解决方案:
 * 基于全局线程池;线程池关闭后重新创建线程池;
 *
 * <p>
 * copy from org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository
 * <p>
 * Consider implementing {@code Licycle} to enable executors shutdown when the process stops.
 */

/**
 * Consider implementing {@code Licycle} to enable executors shutdown when the process stops.
 */
@Slf4j
public class SisyphusExecutorRepository implements ExecutorRepository {

    private int DEFAULT_SCHEDULER_SIZE = Runtime.getRuntime().availableProcessors();

    private final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true));

    private Ring<ScheduledExecutorService> scheduledExecutors = new Ring<>();

    private ScheduledExecutorService serviceExporterExecutor;

    private ConcurrentMap<String, ConcurrentMap<String, ExecutorService>> data = new ConcurrentHashMap<>();

    public SisyphusExecutorRepository() {
        for (int i = 0; i < DEFAULT_SCHEDULER_SIZE; i++) {
            ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-framework-scheduler"));
            scheduledExecutors.addItem(scheduler);
        }
        serviceExporterExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Dubbo-exporter-scheduler"));
    }

    /**
     * Get called when the server or client instance initiating.
     *
     * @param url
     * @return
     */
    @Override
    public synchronized ExecutorService createExecutorIfAbsent(URL url) {
        Map<String, ExecutorService> executors = data.computeIfAbsent(EXECUTOR_SERVICE_COMPONENT_KEY, k -> new ConcurrentHashMap<>());
        //issue-7054:Consumer's executor is sharing globally, key=Integer.MAX_VALUE. Provider's executor is sharing by protocol.

        String consumerKey = getConsumerExecutorKey(url);

        String portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? consumerKey : String.valueOf(url.getPort());
        ExecutorService executor = executors.computeIfAbsent(portKey, k -> createExecutor(url));
        // If executor has been shut down, create a new one
        if (executor.isShutdown() || executor.isTerminated()) {
            executors.remove(portKey);
            executor = createExecutor(url);
            executors.put(portKey, executor);
        }
        return executor;
    }

    @Override
    public ExecutorService getExecutor(URL url) {
        Map<String, ExecutorService> executors = data.get(EXECUTOR_SERVICE_COMPONENT_KEY);
        /**
         * It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
         * have Executor instances generated and stored.
         */
        if (executors == null) {
            log.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
                    "before coming to here.");
            return null;
        }

        String consumerKey = getConsumerExecutorKey(url);
        String portKey = CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY)) ? consumerKey : String.valueOf(url.getPort());

        ExecutorService executor = executors.get(portKey);
        if (executor == null || executor.isShutdown() || executor.isTerminated()) {

            long createStart = System.currentTimeMillis();
            executor = createExecutorIfAbsent(url);
            long createEnd = System.currentTimeMillis();

            log.warn("Executor for " + url + " is shutdown. recreate global executor, cost :{}", createEnd - createStart);
        }

        return executor;
    }

    @Override
    public void updateThreadpool(URL url, ExecutorService executor) {
        try {
            if (url.hasParameter(THREADS_KEY)
                    && executor instanceof ThreadPoolExecutor && !executor.isShutdown()) {
                ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
                int threads = url.getParameter(THREADS_KEY, 0);
                int max = threadPoolExecutor.getMaximumPoolSize();
                int core = threadPoolExecutor.getCorePoolSize();
                if (threads > 0 && (threads != max || threads != core)) {
                    if (threads < core) {
                        threadPoolExecutor.setCorePoolSize(threads);
                        if (core == max) {
                            threadPoolExecutor.setMaximumPoolSize(threads);
                        }
                    } else {
                        threadPoolExecutor.setMaximumPoolSize(threads);
                        if (core == max) {
                            threadPoolExecutor.setCorePoolSize(threads);
                        }
                    }
                }
            }
        } catch (Throwable t) {
            log.error(t.getMessage(), t);
        }
    }

    @Override
    public ScheduledExecutorService nextScheduledExecutor() {
        return scheduledExecutors.pollItem();
    }

    @Override
    public ScheduledExecutorService getServiceExporterExecutor() {
        return serviceExporterExecutor;
    }

    @Override
    public ExecutorService getSharedExecutor() {
        log.info("using shared executor");
        return SHARED_EXECUTOR;
    }

    @Override
    public void destroyAll() {

        log.info("destroy all");

        long start = System.currentTimeMillis();
        data.values().forEach(executors -> {
            if (executors != null) {
                executors.values().forEach(executor -> {
                    if (executor != null && !executor.isShutdown()) {
                        ExecutorUtil.shutdownNow(executor, 100);
                    }
                });
            }
        });

        data.clear();
        long end = System.currentTimeMillis();

        log.info("destroy all cost :" + (end - start));

    }

    private ExecutorService createExecutor(URL url) {
        return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
    }

    private String getConsumerExecutorKey(URL url) {
//        return url.getIp() + "-" + url.getPort();
        return "global";
    }

}
```java
1

不太建议在DefaultExecutorRepository.getExecutor中再去createExecutorIfAbsent。 消费者端线程池首次创建时,在org.apache.dubbo.remoting.transport.AbstractClient#initExecutor会设置url中threadpool=cached。 而运行期间DefaultExecutorRepository.getExecutor时候会造成创建默认fixed线程池(因为ThreadPool的SPI注解里默认类型@SPI(FixedThreadPool.NAME)),而不是期望的cached类型消费者端线程池。

所以我倾向于在org.apache.dubbo.remoting.exchange.support.DefaultFuture#closeChannel中去掉下面部分地吗

-                    ExecutorService futureExecutor = future.getExecutor();
-                    if (futureExecutor != null && !futureExecutor.isTerminated()) {
-                        futureExecutor.shutdownNow();
-                    }

甚至,也可以在DefaultExecutorRepository中对兜底的SHARED_EXECUTOR做个保护。

5

是的,需要一起修改下dubbo.consumer.threadpool=cached,全局线程池情况下,确实不应该关闭,能去除这边关闭最好,但是DefaultFuture貌似没有提供直接扩展方式

9

Resolved.