Add configuration property to control auto-startup of listener container

See gh-33082
This commit is contained in:
Francois Rosiere 2022-11-09 08:55:33 +01:00 committed by Moritz Halbritter
parent cde0d5a625
commit eaa7cdf910
3 changed files with 27 additions and 0 deletions

View File

@ -151,6 +151,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
Listener properties = this.properties.getListener();
map.from(properties::getConcurrency).to(factory::setConcurrency);
map.from(properties::isAutoStartup).to(factory::setAutoStartup);
map.from(this.messageConverter).to(factory::setMessageConverter);
map.from(this.recordFilterStrategy).to(factory::setRecordFilterStrategy);
map.from(this.replyTemplate).to(factory::setReplyTemplate);

View File

@ -1012,6 +1012,11 @@ public class KafkaProperties {
*/
private boolean immediateStop = false;
/**
* Whether to auto start the container.
*/
private boolean autoStartup = true;
public Type getType() {
return this.type;
}
@ -1140,6 +1145,14 @@ public class KafkaProperties {
this.immediateStop = immediateStop;
}
public boolean isAutoStartup() {
return this.autoStartup;
}
public void setAutoStartup(boolean autoStartup) {
this.autoStartup = autoStartup;
}
}
public static class Ssl {

View File

@ -40,6 +40,8 @@ import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.test.context.assertj.AssertableApplicationContext;
@ -501,6 +503,7 @@ class KafkaAutoConfigurationTests {
assertThat(containerProperties.isStopImmediate()).isTrue();
assertThat(kafkaListenerContainerFactory).extracting("concurrency").isEqualTo(3);
assertThat(kafkaListenerContainerFactory.isBatchListener()).isTrue();
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("autoStartup", true);
assertThat(context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)).hasSize(1);
KafkaJaasLoginModuleInitializer jaas = context.getBean(KafkaJaasLoginModuleInitializer.class);
assertThat(jaas).hasFieldOrPropertyWithValue("loginModule", "foo");
@ -672,6 +675,16 @@ class KafkaAutoConfigurationTests {
});
}
@ParameterizedTest(name = "{0}")
@ValueSource(booleans = { true, false })
void testConcurrentKafkaListenerContainerFactoryAutoStartup(boolean autoStartup) {
this.contextRunner.withPropertyValues("spring.kafka.listener.auto-startup=" + autoStartup).run((context) -> {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory = context
.getBean(ConcurrentKafkaListenerContainerFactory.class);
assertThat(kafkaListenerContainerFactory).hasFieldOrPropertyWithValue("autoStartup", autoStartup);
});
}
@Test
void specificSecurityProtocolOverridesCommonSecurityProtocol() {
this.contextRunner.withPropertyValues("spring.kafka.security.protocol=SSL",