Polish "Set virtual thread names for RabbitMQ and Pulsar"

See gh-39958
This commit is contained in:
Moritz Halbritter 2024-03-18 11:24:17 +01:00
parent ecda754116
commit 09652cb282
4 changed files with 22 additions and 47 deletions

View File

@ -47,16 +47,6 @@ import org.springframework.core.task.VirtualThreadTaskExecutor;
@ConditionalOnClass(EnableRabbit.class)
class RabbitAnnotationDrivenConfiguration {
/**
* Default Name of the thread created for simple rabbit listener.
*/
public static final String THREADNAME_RABBIT_SIMPLE = "rabbit-simple-";
/**
* Default Name of the thread created for direct rabbit listener.
*/
public static final String THREADNAME_RABBIT_DIRECT = "rabbit-direct-";
private final ObjectProvider<MessageConverter> messageConverter;
private final ObjectProvider<MessageRecoverer> messageRecoverer;
@ -86,7 +76,7 @@ class RabbitAnnotationDrivenConfiguration {
@ConditionalOnThreading(Threading.VIRTUAL)
SimpleRabbitListenerContainerFactoryConfigurer simpleRabbitListenerContainerFactoryConfigurerVirtualThreads() {
SimpleRabbitListenerContainerFactoryConfigurer configurer = simpleListenerConfigurer();
configurer.setTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_RABBIT_SIMPLE));
configurer.setTaskExecutor(new VirtualThreadTaskExecutor("rabbit-simple-"));
return configurer;
}
@ -115,7 +105,7 @@ class RabbitAnnotationDrivenConfiguration {
@ConditionalOnThreading(Threading.VIRTUAL)
DirectRabbitListenerContainerFactoryConfigurer directRabbitListenerContainerFactoryConfigurerVirtualThreads() {
DirectRabbitListenerContainerFactoryConfigurer configurer = directListenerConfigurer();
configurer.setTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_RABBIT_DIRECT));
configurer.setTaskExecutor(new VirtualThreadTaskExecutor("rabbit-direct-"));
return configurer;
}

View File

@ -73,19 +73,9 @@ import org.springframework.pulsar.reader.PulsarReaderContainerProperties;
@Import(PulsarConfiguration.class)
public class PulsarAutoConfiguration {
/**
* Default Name of the thread created for pulsar consumer.
*/
public static final String THREADNAME_PULSAR_CONSUMER = "pulsar-consumer-";
private final PulsarProperties properties;
/**
* Default Name of the thread created for pulsar task executor.
*/
public static final String THREADNAME_PULSAR_TASKEXECUTOR = "pulsar-taskexecutor-";
private PulsarProperties properties;
private PulsarPropertiesMapper propertiesMapper;
private final PulsarPropertiesMapper propertiesMapper;
PulsarAutoConfiguration(PulsarProperties properties) {
this.properties = properties;
@ -168,7 +158,7 @@ public class PulsarAutoConfiguration {
containerProperties.setSchemaResolver(schemaResolver);
containerProperties.setTopicResolver(topicResolver);
if (Threading.VIRTUAL.isActive(environment)) {
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_PULSAR_CONSUMER));
containerProperties.setConsumerTaskExecutor(new VirtualThreadTaskExecutor("pulsar-consumer-"));
}
this.propertiesMapper.customizeContainerProperties(containerProperties);
return new ConcurrentPulsarListenerContainerFactory<>(pulsarConsumerFactory, containerProperties);
@ -199,8 +189,7 @@ public class PulsarAutoConfiguration {
PulsarReaderContainerProperties readerContainerProperties = new PulsarReaderContainerProperties();
readerContainerProperties.setSchemaResolver(schemaResolver);
if (Threading.VIRTUAL.isActive(environment)) {
readerContainerProperties
.setReaderTaskExecutor(new VirtualThreadTaskExecutor(THREADNAME_PULSAR_TASKEXECUTOR));
readerContainerProperties.setReaderTaskExecutor(new VirtualThreadTaskExecutor("pulsar-reader-"));
}
this.propertiesMapper.customizeReaderContainerProperties(readerContainerProperties);
return new DefaultPulsarReaderContainerFactory<>(pulsarReaderFactory, readerContainerProperties);

View File

@ -552,11 +552,10 @@ class RabbitAutoConfigurationTests {
.getBean("rabbitListenerContainerFactory", SimpleRabbitListenerContainerFactory.class);
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(RabbitAnnotationDrivenConfiguration.THREADNAME_RABBIT_SIMPLE + "[0-9]*");
Object taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName()).containsPattern("rabbit-simple-[0-9]+");
});
}
@ -570,11 +569,10 @@ class RabbitAutoConfigurationTests {
DirectRabbitListenerContainerFactoryConfigurer.class);
assertThat(rabbitListenerContainerFactory).extracting("taskExecutor")
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(RabbitAnnotationDrivenConfiguration.THREADNAME_RABBIT_DIRECT + "[0-9]*");
Object taskExecutor = ReflectionTestUtils.getField(rabbitListenerContainerFactory, "taskExecutor");
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName()).containsPattern("rabbit-direct-[0-9]+");
});
}

View File

@ -508,11 +508,10 @@ class PulsarAutoConfigurationTests {
.getBean(ConcurrentPulsarListenerContainerFactory.class);
assertThat(factory.getContainerProperties().getConsumerTaskExecutor())
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = factory.getContainerProperties().getConsumerTaskExecutor();
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(PulsarAutoConfiguration.THREADNAME_PULSAR_CONSUMER + "[0-9]*");
Object taskExecutor = factory.getContainerProperties().getConsumerTaskExecutor();
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName()).containsPattern("pulsar-consumer-[0-9]+");
});
}
@ -568,11 +567,10 @@ class PulsarAutoConfigurationTests {
.getBean(DefaultPulsarReaderContainerFactory.class);
assertThat(factory.getContainerProperties().getReaderTaskExecutor())
.isInstanceOf(VirtualThreadTaskExecutor.class);
final var taskExecutor = factory.getContainerProperties().getReaderTaskExecutor();
final var virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
final var threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName())
.containsPattern(PulsarAutoConfiguration.THREADNAME_PULSAR_TASKEXECUTOR + "[0-9]*");
Object taskExecutor = factory.getContainerProperties().getReaderTaskExecutor();
Object virtualThread = ReflectionTestUtils.getField(taskExecutor, "virtualThreadFactory");
Thread threadCreated = ((ThreadFactory) virtualThread).newThread(mock(Runnable.class));
assertThat(threadCreated.getName()).containsPattern("pulsar-reader-[0-9]+");
});
}