[spring-projects/spring-boot]启用虚拟线程后,自动配置使用它们的 AsyncTaskExecutor

2024-05-08 360 views
2

目前,TaskExecutionAutoConfiguration自动配置ThreadPoolTaskExecutor使用TaskExecutorBuilder.我们应该考虑提供一个选项来自动配置SimpleAsyncTaskExecutor使用虚拟线程的 a。在 Framework 中进行这些更改之后,SimpleAsyncTaskExecutor可以通过调用配置为使用虚拟线程setVirtualThreads(true)

回答

2

您好,请问这个问题可以用于工作吗?如果是这样,我想问是否有特定的方法来检查spring-boot中的虚拟线程可用性。谢谢。

2

感谢您的建议,但我认为这不是正确的方法。 JEP有一个部分明确指出不应池化虚拟线程。因此,我认为当启用虚拟线程ThreadPoolTaskExecutor与虚拟线程工厂一起使用时,引导的默认配置会令人困惑。

如果您认为这awaitTermination很有用,您可能会建议将其作为 Spring 框架团队的增强功能。

1

好吧,看起来很有道理。谢谢!

9

我已经测试了 3.2.0-M1 中的虚拟线程支持,并且缺少对 TaskScheduler 的支持。在调度中也没有发现虚拟线程的问题。

计划任务是否计划使用虚拟线程?

1

目前,我使用这个 bean 定义作为任务调度的解决方法:

    bean<ConcurrentTaskScheduler>(name = "taskScheduler", isLazyInit = true) {
        ConcurrentTaskScheduler(Executors.newScheduledThreadPool(0, Thread.ofVirtual().factory()))
    }
7

正如JEP中所述,虚拟线程不应被池化,因此我们认为将它们用于任务调度是没有意义的。

框架团队的建议/评论:

SchedulingConfigurer:一个ThreadPoolTaskScheduler配置,可以使用共享 beansetThreadFactory或从共享bean 中new VirtualThreadTaskExecutor().getVirtualThreadFactory()获取。但是,它仍然会池化其线程(默认为 1),因此一开始就不是虚拟线程的惯用法。默认情况下,最好将其保留为一个公平调度的平台线程。ThreadFactoryVirtualThreadTaskExecutor

1

是的,我知道 - 目前这只是一个解决方法。

TaskScheduler如果没有线程池(例如SimpleAsyncTaskExecutor异步任务),我找不到合适的实现。

所以我的问题是:

计划任务是否计划使用虚拟线程? (虚拟线程兼容TaskScheduler实现)

谢谢你!

5

计划任务是否计划使用虚拟线程?

不。正如我上面所说,框架团队认为使用单个公平调度的平台线程是最佳选择。

如果您不同意这一点并认为基于虚拟线程的实现TaskScheduler会很有用,请提出Spring Framework 问题

1

看来我错过了一些东西......异步和计划任务在执行意义上有何不同?为什么异步任务在虚拟线程上执行,而调度任务却没有在虚拟线程上执行?

我期望的是在虚拟线程上执行计划任务spring.threads.virtual.enabled=true。就像异步任务的行为一样......

4

根本区别在于,默认情况下,异步执行是使用多线程执行的,而计划任务执行是使用单线程执行的。因此,异步执行非常适合虚拟线程的使用——多个平台线程池可以用非池化虚拟线程替换——但计划任务执行则不然。用单个虚拟线程替换单个公平调度的平台线程没有什么意义。

7

感谢您如此详细的解释!

所以计划任务的默认并发度有所不同。不幸的是,我已经有 17 年没有调度程序并发 = 1 的项目了。也没有这样的默认值(Quartz、JEE、Quarkus、Micronaut 等)

我尝试提交问题并针对框架提供 PR。

再次感谢

3

@vladimirfx 虽然 Spring Framework 问题跟踪器中的票证将不胜感激,但我不认为我们TaskScheduler为此实现和维护完全自定义。我们可以为ThreadFactory上的虚拟线程设置提供一个方便的开箱即用选项ThreadPoolTaskScheduler,这是您想要的吗?或者也许是一个单独的VirtualThreadTaskScheduler类在内部执行等效操作,但在类名中听起来不像线程池? ;-)

也就是说,针对此类场景考虑的常见安排是ThreadPoolTaskScheduler使用单个调度程序线程,然后每个计划任务移交给其自己的线程,如以下 StackOverflow 答案所示: https://stackoverflow.com/questions/76587253 /how-to-use-virtual-threads-with-scheduledexecutorservice 与 Spring 注释等效的是一个@Scheduled @Async方法,这将分派到已经具有当前引导安排的虚拟线程执行器。

为了避免@Async在每个@Scheduled方法上都进行这样的声明,我们还可以添加这样一个选项,ThreadPoolTaskScheduler以便它将每个回调分派到给定的单独的回调TaskExecutor,该单独的回调可以指向 Boot.txt 中的虚拟回调。这将使用单个平台线程进行调度,并为每个任务使用一个虚拟线程进行回调。您是否认为这比Executors.newScheduledThreadPool(0, Thread.ofVirtual().factory())样式设置更可取,或者您是否更愿意依赖于常规计划线程池中的任务的序列化执行?

5

顺便说一句,Executors.newScheduledThreadPool(0, Thread.ofVirtual().factory())这里真的有帮助吗?如果通常在任何时间点都安排了某些事情,那么总是至少有 1 个线程来进行调度。如果该线程是虚拟的,那么它无论如何都不会占用任何重要的资源。从这个角度来看,Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())听起来是更合适的安排。另外,您表示调度程序需要更高的并发级别,所以我认为 0 并不表示一开始?

我想知道基于虚拟线程的我们实际上需要哪些配置选项TaskScheduler。它是否与 on 大致相同ThreadPoolTaskScheduler,或者是否存在在虚拟线程设置中绝对没有意义的选项,或者在虚拟线程设置中始终有意义的选项,在这种情况下,一个不同的VirtualThreadTaskScheduler可能是更好的表示。

一般来说,我们的偏好仍然是@Scheduled @Async类似的执行模型,每个计划的回调有一个虚拟线程。在这种情况下,调度程序将始终基于单个调度程序线程,因此作为一个不同的TaskScheduler变体(具有硬编码的单个调度程序线程和预定义的委托执行程序)实际上可能会更好,而不是将该选项烘焙到ThreadPoolTaskScheduler。此外,关闭行为也会有所不同,因为计划的回调在非托管线程中运行,而无需ScheduledExecutorService在关闭时等待它们。情况VirtualThreadTaskExecutor也是如此,它也具有与ThreadPoolTaskExecutor.

5

或者也许是一个单独的VirtualThreadTaskScheduler类在内部执行等效操作,但在类名中听起来不像线程池? ;-)

我现在正在制作这个变体的原型。具有可配置的并发级别(通过信号量实现)。我什至保留默认并发= 1 ?

1 个用于调度的虚拟线程比一个平台更好,因为大多数时候这样的线程都在等待。等待虚拟线程比平台线程更流畅。

从这个角度来看,Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory())听起来是更合适的安排

0表示不需要线程缓存。是的,1 更合适,但在语义上没有任何改变。

感谢您指出关闭语义差异 - 我已尝试克服它。

4

在相关说明中,我将介绍一个SimpleAsyncTaskSchedulerhttps://github.com/spring-projects/spring-framework/issues/30956),它是我的 6.1 M2 执行器/调度程序修订版的后续版本。它继承了setVirtualThreads(true)功能SimpleAsyncTaskExecutor(及其可配置的并发限制),并且 - 当以这种方式配置时 - 使用单个虚拟线程进行调度,并为每个计划任务执行使用一个单独的虚拟线程。这实际上@Scheduled @Async与我上面提到的执行模型类似。我今晚会提交这个,因为这通常是我们认为值得拥有的一个变体。

这仍然为基于池的虚拟线程调度程序留下了空间。因此,如果您正在那里进行自定义实现,那么它与虚拟 ThreadFactory 设置有何不同,而虚拟 ThreadFactory 设置又与您上面的设置ThreadPoolTaskScheduler类似?ConcurrentTaskScheduler(Executors.newScheduledThreadPool(0, Thread.ofVirtual().factory()))附带说明一下,ThreadPoolTaskScheduler还附带了 CRaC 所需的暂停/恢复/关闭生命周期集成。

最后但并非最不重要的一点是,存在一个问题:对于 Boot 的虚拟线程属性来说,什么是好的默认调度程序设置。总是可以选择提供自定义执行程序/调度程序......但是,默认选择本身就很困难。

5

@jhoeller 在 Spring Boot 中使用虚拟线程时如何实现优雅关闭?它似乎既不支持VirtualThreadTaskExecutor也不SimpleAsyncTaskExecutor支持SmartLifecycle或提供类似的功能ExecutorConfigurationSupport#shutdown

抱歉,如果我遗漏了一些明显的东西。

9

一般执行器不一定需要支持生命周期集成,只要任务的提交者可以控制所提交的任务,为其设置活动或关闭信号并等待它们在关闭时完成即可。例如,我们的 JMS DefaultMessageListenerContainer 及其异步调用程序就是这种情况,基于 Future 的提交通常也是这种情况,其中调用者通过句柄与任务交互Future。特别是对于常见的框架提交的任务,不需要在执行器本身中跟踪它们的生命周期。但是,对于自定义编程提交,如果原始提交者不保留哪些任务仍在运行,则可能不清楚哪些任务仍在运行。

对于非池化执行器,只有通过保留执行器内的所有活动任务并等待它们完成,才能由执行器控制关闭所有任务。在某种程度上,这违背了虚拟线程的本质,或者至少违背了它们的惯用用法。如果需要这样的受控关闭,并且可能还需要结合并发限制,那么我实际上会考虑ThreadPoolTaskExecutor使用虚拟 ThreadFactory 进行设置。跟踪每个单独提交的任务的开销可能会超过虚拟线程的池开销,因此我宁愿使用ThreadPoolTaskExecutor池化虚拟线程来实现此类目的,同样使用ThreadPoolTaskScheduler.

在生命周期管理方面,调度程序有点不同:由于它有一个调度程序线程不断触发周期性任务执行,因此它绝对需要参与受控关闭,并且还应该参与上下文范围的暂停/恢复需要暂停其触发器的步骤。因此,新的SimpleAsyncTaskScheduler实现不是以这种方式管理各个任务的执行,而是仅管理其单个调度程序线程。SmartLifecycle

正如上面所暗示的,这里有多种策略的空间。只是引导设置的默认选择很困难:对于经典线程池,问题是根据通常阻塞的线程数量来使用哪个池限制。对于虚拟线程执行器来说,问题在于执行器控制的生命周期管理对于给定的应用程序有多重要。如果常见的做法是使用带有虚拟线程的线程池作为这两者之间的折衷方案,我不会感到惊讶。

2

太好了,谢谢您的详细解释!

0

回顾一下,在和中提供一个waitForTasksToCompleteOnShutdown选项看起来很简单,跟踪每个 Runnable 的执行线程以及关闭时的一些中断/通知信号,以便我们可以等待活动线程集为空。这确实会带来一些开销,因此默认情况下不会打开,但在涉及非托管任务提交的情况下,似乎值得提供作为受控关闭的选择加入标志。SimpleAsyncTaskExecutorSimpleAsyncTaskScheduler

2

我一直在寻找类似的东西。如果我们实现这样的跟踪,我们就有效地复制了 ScheduledExecutorService 的功能。

那么它会比这更好:

Executors.newScheduledThreadPool(0, Thread.ofVirtual().factory())

我尝试实现正确的关闭行为,得出的结论是我从 JRE 重新实现了池调度程序...

那么我们需要一些新的东西吗?使用池大小为 0 的池调度程序并没有什么问题。

0

taskTerminationTimeout有一个“打开”SimpleAsyncTaskExecutor和“现在”形式的优雅关闭选项SimpleAsyncTaskScheduler。当设置为 >0 时,这将导致对每个执行线程进行任务跟踪,我们还使用该线程在关闭时中断正在运行的线程。有一个与ThreadPoolTaskScheduler关闭行为兼容性的集成测试,到目前为止看起来非常有希望。

至于需要新的东西,从我现在的立场来看,我确实看到了这两个选择:要么SimpleAsyncTaskScheduler风格,要么ThreadPoolTaskScheduler风格。包装ConcurrentTaskScheduler创建Executors的池实际上是 的变体ThreadPoolTaskScheduler。在这种情况下,池大小是固定的,但SimpleAsyncTaskScheduler允许计划任务执行的动态并发。

2

您对这个变体有何看法(对不起 Kotlin):

            bean<ConcurrentTaskScheduler>(name = "taskScheduler", isLazyInit = true) {
                val concurrencyLevel = env.getProperty<Int>("spring.task.scheduling.concurrency") ?: 1
                val executor = ScheduledThreadPoolExecutor(1, Thread.ofVirtual().factory()).apply {
                    this.maximumPoolSize = concurrencyLevel
                }
                ConcurrentTaskScheduler(executor)
            }
  1. 仅使用虚拟线程
  2. 保留并发级别
  3. 保留关闭语义
  4. 不要池化“执行”线程
4

在 a 上设置最大池大小ScheduledThreadPoolExecutor并不会真正产生动态缩放效果,因为调度程序充当具有无界队列的固定大小池,在同一线程上混合触发器协调和实际计划任务执行。相比之下,SimpleAsyncTaskScheduler具有完全不同的扩展行为,其中始终单个调度程序线程在单独的工作线程上启动任意数量的并发计划任务执行,只是可能受到并发限制的限制。

附带说明一下,ConcurrentTaskScheduler不提供与 Spring 上下文相同程度的生命周期集成,特别是从 6.1 开始。它缺少暂停/恢复支持以及ThreadPoolTaskScheduler现在附带的正常关闭信号。因此,我通常会推荐专门ThreadPoolTaskScheduler为您的目的配置的容器,而不是自定义ScheduledThreadPoolExecutor实例的包装器。

7

根本区别在于,默认情况下,异步执行是使用多线程执行的,而计划任务执行是使用单线程执行的。因此,异步执行非常适合虚拟线程的使用——多个平台线程池可以用非池化虚拟线程替换——但计划任务执行则不然。用单个虚拟线程替换单个公平调度的平台线程没有什么意义。

当我在计划任务中执行异步任务时,我发现异步任务没有使用虚拟线程来执行。当我需要执行高频周期性任务时,如何为其启用虚拟线程?

7

@129duckflew 如果不了解更多信息就很难说,例如您正在使用的 Spring Boot 版本、您正在使用的 Java 版本、您如何配置应用程序等。关于 Stack Overflow 的一个问题,可重现的示例是提供此信息并获得帮助的好方法。

4

@129duckflew 如果不了解更多信息就很难说,例如您正在使用的 Spring Boot 版本、您正在使用的 Java 版本、您如何配置应用程序等。关于 Stack Overflow 的一个问题,可重现的示例是提供此信息并获得帮助的好方法。

我使用的是Java21 SpringBoot2.7.3。我使用java.util.Timer创建了一个定时任务,然后在这个定时任务中调用了带有@Async注解的异步方法。然后我将SpringBoot的异步任务处理器配置为虚拟的。 Thread Excutors,最后我在标有@Async的代码中打印了Thread.currentThread.isVirtual(),发现输出了False。示例代码如下

 @PostConstruct
 public void pushStatus() {
        ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
        Timer timer = new Timer();
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                countWebSocketsMap.forEach((k, v) -> {
                    executor.submit(() -> pushCount(k, v));
                });
            }
        }, 0, 500);
 }
@Async
public void pushCount(String sessionId, WebSocketSession session) {
     log.info("{}",Thread.currentThread().isVirtual());
}
@EnableAsync
@Configuration
@ConditionalOnProperty(
        value = "spring.thread-executor",
        havingValue = "virtual"
)
@Slf4j
public class AsyncConfig   {
    @Bean
    public AsyncTaskExecutor applicationTaskExecutor() {
        return new TaskExecutorAdapter(Executors.newVirtualThreadPerTaskExecutor());
    }

    @Bean
    public TomcatProtocolHandlerCustomizer<?> protocolHandlerVirtualThreadExecutorCustomizer() {
        return protocolHandler -> {
            protocolHandler.setExecutor(Executors.newVirtualThreadPerTaskExecutor());
        };
    }
}

我指定了 SpringBoot 配置:

spring:
  thread-executor: virtual
1

谢谢,但这不是提供此信息的正确位置。正如我上面所说,Stack Overflow 是首选地方。由于您使用的是 Spring Boot 2.7,这也与此问题无关,因为虚拟线程支持是 Spring Boot 3.2 中的新增功能。如果您需要有关自己的虚拟线程相关设置的帮助,请在 Stack Overflow 上提问。