Configure suitable TaskExecutor for WebSocket

For WebSocket support the preconfigured ThreadExecutor is set for
ChannelRegistrations similar to the JpaRepositoriesAutoConfiguration.

See gh-39611
This commit is contained in:
BenchmarkingBuffalo 2024-02-17 20:44:48 +01:00 committed by Moritz Halbritter
parent 3dd3fc8f3a
commit 1d820a8994
4 changed files with 75 additions and 14 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2022 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -67,6 +67,7 @@ import org.springframework.util.ClassUtils;
* @author Josh Long
* @author Scott Frederick
* @author Stefano Cordio
* @author Lasse Wulff
* @since 1.0.0
* @see EnableJpaRepositories
*/
@ -84,20 +85,14 @@ public class JpaRepositoriesAutoConfiguration {
public EntityManagerFactoryBuilderCustomizer entityManagerFactoryBootstrapExecutorCustomizer(
Map<String, AsyncTaskExecutor> taskExecutors) {
return (builder) -> {
AsyncTaskExecutor bootstrapExecutor = determineBootstrapExecutor(taskExecutors);
AsyncTaskExecutor bootstrapExecutor = TaskExecutionAutoConfiguration
.determineAsyncTaskExecutor(taskExecutors);
if (bootstrapExecutor != null) {
builder.setBootstrapExecutor(bootstrapExecutor);
}
};
}
private AsyncTaskExecutor determineBootstrapExecutor(Map<String, AsyncTaskExecutor> taskExecutors) {
if (taskExecutors.size() == 1) {
return taskExecutors.values().iterator().next();
}
return taskExecutors.get(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME);
}
private static final class BootstrapExecutorCondition extends AnyNestedCondition {
BootstrapExecutorCondition() {

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,11 +16,14 @@
package org.springframework.boot.autoconfigure.task;
import java.util.Map;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Import;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@ -30,6 +33,7 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
* @author Stephane Nicoll
* @author Camille Vienot
* @author Moritz Halbritter
* @author Lasse Wulff
* @since 2.1.0
*/
@ConditionalOnClass(ThreadPoolTaskExecutor.class)
@ -46,4 +50,11 @@ public class TaskExecutionAutoConfiguration {
*/
public static final String APPLICATION_TASK_EXECUTOR_BEAN_NAME = "applicationTaskExecutor";
public static AsyncTaskExecutor determineAsyncTaskExecutor(Map<String, AsyncTaskExecutor> taskExecutors) {
if (taskExecutors.size() == 1) {
return taskExecutors.values().iterator().next();
}
return taskExecutors.get(APPLICATION_TASK_EXECUTOR_BEAN_NAME);
}
}

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -17,6 +17,7 @@
package org.springframework.boot.autoconfigure.websocket.servlet;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -28,14 +29,17 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication;
import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication.Type;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.messaging.converter.ByteArrayMessageConverter;
import org.springframework.messaging.converter.DefaultContentTypeResolver;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.simp.config.AbstractMessageBrokerConfiguration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.util.MimeTypeUtils;
import org.springframework.web.socket.config.annotation.DelegatingWebSocketMessageBrokerConfiguration;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@ -44,6 +48,7 @@ import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerCo
* {@link EnableAutoConfiguration Auto-configuration} for WebSocket-based messaging.
*
* @author Andy Wilkinson
* @author Lasse Wulff
* @since 1.3.0
*/
@AutoConfiguration(after = JacksonAutoConfiguration.class)
@ -58,8 +63,12 @@ public class WebSocketMessagingAutoConfiguration {
private final ObjectMapper objectMapper;
WebSocketMessageConverterConfiguration(ObjectMapper objectMapper) {
private final AsyncTaskExecutor executor;
WebSocketMessageConverterConfiguration(ObjectMapper objectMapper,
Map<String, AsyncTaskExecutor> taskExecutors) {
this.objectMapper = objectMapper;
this.executor = TaskExecutionAutoConfiguration.determineAsyncTaskExecutor(taskExecutors);
}
@Override
@ -74,6 +83,16 @@ public class WebSocketMessagingAutoConfiguration {
return false;
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.executor(this.executor);
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.executor(this.executor);
}
@Bean
static LazyInitializationExcludeFilter eagerStompWebSocketHandlerMapping() {
return (name, definition, type) -> name.equals("stompWebSocketHandlerMapping");

View File

@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 the original author or authors.
* Copyright 2012-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -19,8 +19,10 @@ package org.springframework.boot.autoconfigure.websocket.servlet;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@ -35,6 +37,7 @@ import org.skyscreamer.jsonassert.JSONAssert;
import org.springframework.boot.LazyInitializationBeanFactoryPostProcessor;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
import org.springframework.boot.autoconfigure.web.servlet.DispatcherServletAutoConfiguration;
import org.springframework.boot.autoconfigure.web.servlet.ServletWebServerFactoryAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@ -43,10 +46,13 @@ import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactor
import org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SimpleMessageConverter;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompFrameHandler;
@ -54,6 +60,7 @@ import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandler;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.security.util.FieldUtils;
import org.springframework.stereotype.Controller;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
@ -75,6 +82,7 @@ import static org.assertj.core.api.Assertions.fail;
* Tests for {@link WebSocketMessagingAutoConfiguration}.
*
* @author Andy Wilkinson
* @author Lasse Wulff
*/
class WebSocketMessagingAutoConfigurationTests {
@ -129,10 +137,38 @@ class WebSocketMessagingAutoConfigurationTests {
}
}
@Test
void predefinedThreadExecutorIsSelectedForInboundChannel() throws Throwable {
AsyncTaskExecutor expectedExecutor = new SimpleAsyncTaskExecutor();
ChannelRegistration registration = new ChannelRegistration();
WebSocketMessagingAutoConfiguration.WebSocketMessageConverterConfiguration configuration = new WebSocketMessagingAutoConfiguration.WebSocketMessageConverterConfiguration(
new ObjectMapper(),
Map.of(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME, expectedExecutor));
configuration.configureClientInboundChannel(registration);
AsyncTaskExecutor mappedExecutor = (AsyncTaskExecutor) FieldUtils.getFieldValue(registration, "executor");
assertThat(mappedExecutor).isEqualTo(expectedExecutor);
}
@Test
void predefinedThreadExecutorIsSelectedForOutboundChannel() throws Throwable {
AsyncTaskExecutor expectedExecutor = new SimpleAsyncTaskExecutor();
ChannelRegistration registration = new ChannelRegistration();
WebSocketMessagingAutoConfiguration.WebSocketMessageConverterConfiguration configuration = new WebSocketMessagingAutoConfiguration.WebSocketMessageConverterConfiguration(
new ObjectMapper(),
Map.of(TaskExecutionAutoConfiguration.APPLICATION_TASK_EXECUTOR_BEAN_NAME, expectedExecutor));
configuration.configureClientOutboundChannel(registration);
AsyncTaskExecutor mappedExecutor = (AsyncTaskExecutor) FieldUtils.getFieldValue(registration, "executor");
assertThat(mappedExecutor).isEqualTo(expectedExecutor);
}
private List<MessageConverter> getCustomizedConverters() {
List<MessageConverter> customizedConverters = new ArrayList<>();
WebSocketMessagingAutoConfiguration.WebSocketMessageConverterConfiguration configuration = new WebSocketMessagingAutoConfiguration.WebSocketMessageConverterConfiguration(
new ObjectMapper());
new ObjectMapper(), Collections.emptyMap());
configuration.configureMessageConverters(customizedConverters);
return customizedConverters;
}