Add SSL bundle support to Apache Kafka auto-configuration

Closes gh-37629
Co-authored-by: Scott Frederick <sfrederick@vmware.com>
This commit is contained in:
Andy Wilkinson 2023-10-18 16:29:29 -05:00 committed by Scott Frederick
parent af2e363252
commit ec6415f04b
8 changed files with 328 additions and 59 deletions

View File

@ -23,6 +23,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnThreading;
import org.springframework.boot.autoconfigure.thread.Threading;
import org.springframework.boot.ssl.SslBundles;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@ -53,6 +54,8 @@ import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
* @author Eddú Meléndez
* @author Thomas Kåsene
* @author Moritz Halbritter
* @author Andy Wilkinson
* @author Scott Frederick
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(EnableKafka.class)
@ -149,10 +152,11 @@ class KafkaAnnotationDrivenConfiguration {
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory,
ObjectProvider<ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>>> kafkaContainerCustomizer) {
ObjectProvider<ContainerCustomizer<Object, Object, ConcurrentMessageListenerContainer<Object, Object>>> kafkaContainerCustomizer,
ObjectProvider<SslBundles> sslBundles) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
configurer.configure(factory, kafkaConsumerFactory.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(
this.properties.buildConsumerProperties(sslBundles.getIfAvailable()))));
kafkaContainerCustomizer.ifAvailable(factory::setContainerCustomizer);
return factory;
}

View File

@ -35,6 +35,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.ssl.SslBundles;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.core.ConsumerFactory;
@ -64,6 +65,8 @@ import org.springframework.retry.backoff.SleepingBackOffPolicy;
* @author Moritz Halbritter
* @author Andy Wilkinson
* @author Phillip Webb
* @author Andy Wilkinson
* @author Scott Frederick
* @since 1.5.0
*/
@AutoConfiguration
@ -107,8 +110,8 @@ public class KafkaAutoConfiguration {
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public DefaultKafkaConsumerFactory<?, ?> kafkaConsumerFactory(KafkaConnectionDetails connectionDetails,
ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
Map<String, Object> properties = this.properties.buildConsumerProperties();
ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers, ObjectProvider<SslBundles> sslBundles) {
Map<String, Object> properties = this.properties.buildConsumerProperties(sslBundles.getIfAvailable());
applyKafkaConnectionDetailsForConsumer(properties, connectionDetails);
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<>(properties);
customizers.orderedStream().forEach((customizer) -> customizer.customize(factory));
@ -118,8 +121,8 @@ public class KafkaAutoConfiguration {
@Bean
@ConditionalOnMissingBean(ProducerFactory.class)
public DefaultKafkaProducerFactory<?, ?> kafkaProducerFactory(KafkaConnectionDetails connectionDetails,
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
Map<String, Object> properties = this.properties.buildProducerProperties();
ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers, ObjectProvider<SslBundles> sslBundles) {
Map<String, Object> properties = this.properties.buildProducerProperties(sslBundles.getIfAvailable());
applyKafkaConnectionDetailsForProducer(properties, connectionDetails);
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(properties);
String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
@ -155,8 +158,8 @@ public class KafkaAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public KafkaAdmin kafkaAdmin(KafkaConnectionDetails connectionDetails) {
Map<String, Object> properties = this.properties.buildAdminProperties();
public KafkaAdmin kafkaAdmin(KafkaConnectionDetails connectionDetails, ObjectProvider<SslBundles> sslBundles) {
Map<String, Object> properties = this.properties.buildAdminProperties(sslBundles.getIfAvailable());
applyKafkaConnectionDetailsForAdmin(properties, connectionDetails);
KafkaAdmin kafkaAdmin = new KafkaAdmin(properties);
KafkaProperties.Admin admin = this.properties.getAdmin();

View File

@ -38,6 +38,8 @@ import org.springframework.boot.context.properties.DeprecatedConfigurationProper
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
import org.springframework.boot.convert.DurationUnit;
import org.springframework.boot.ssl.SslBundle;
import org.springframework.boot.ssl.SslBundles;
import org.springframework.core.io.Resource;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
@ -55,6 +57,8 @@ import org.springframework.util.unit.DataSize;
* @author Artem Bilan
* @author Nakul Mishra
* @author Tomaz Fernandes
* @author Andy Wilkinson
* @author Scott Frederick
* @since 1.5.0
*/
@ConfigurationProperties(prefix = "spring.kafka")
@ -157,7 +161,7 @@ public class KafkaProperties {
return this.retry;
}
private Map<String, Object> buildCommonProperties() {
private Map<String, Object> buildCommonProperties(SslBundles sslBundles) {
Map<String, Object> properties = new HashMap<>();
if (this.bootstrapServers != null) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
@ -165,7 +169,7 @@ public class KafkaProperties {
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.ssl.buildProperties(sslBundles));
properties.putAll(this.security.buildProperties());
if (!CollectionUtils.isEmpty(this.properties)) {
properties.putAll(this.properties);
@ -177,13 +181,29 @@ public class KafkaProperties {
* Create an initial map of consumer properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary, and override the
* default kafkaConsumerFactory bean.
* default {@code kafkaConsumerFactory} bean.
* @return the consumer properties initialized with the customizations defined on this
* instance
* @deprecated since 3.2.0 for removal in 3.4.0 in favor of
* {@link #buildConsumerProperties(SslBundles)}}
*/
@Deprecated(since = "3.2.0", forRemoval = true)
public Map<String, Object> buildConsumerProperties() {
return buildConsumerProperties(null);
}
/**
* Create an initial map of consumer properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary, and override the
* default {@code kafkaConsumerFactory} bean.
* @param sslBundles bundles providing SSL trust material
* @return the consumer properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildConsumerProperties() {
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.consumer.buildProperties());
public Map<String, Object> buildConsumerProperties(SslBundles sslBundles) {
Map<String, Object> properties = buildCommonProperties(sslBundles);
properties.putAll(this.consumer.buildProperties(sslBundles));
return properties;
}
@ -191,13 +211,29 @@ public class KafkaProperties {
* Create an initial map of producer properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary, and override the
* default kafkaProducerFactory bean.
* default {@code kafkaProducerFactory} bean.
* @return the producer properties initialized with the customizations defined on this
* instance
* @deprecated since 3.2.0 for removal in 3.4.0 in favor of
* {@link #buildProducerProperties(SslBundles)}}
*/
@Deprecated(since = "3.2.0", forRemoval = true)
public Map<String, Object> buildProducerProperties() {
return buildProducerProperties(null);
}
/**
* Create an initial map of producer properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary, and override the
* default {@code kafkaProducerFactory} bean.
* @param sslBundles bundles providing SSL trust material
* @return the producer properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildProducerProperties() {
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.producer.buildProperties());
public Map<String, Object> buildProducerProperties(SslBundles sslBundles) {
Map<String, Object> properties = buildCommonProperties(sslBundles);
properties.putAll(this.producer.buildProperties(sslBundles));
return properties;
}
@ -205,13 +241,29 @@ public class KafkaProperties {
* Create an initial map of admin properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary, and override the
* default kafkaAdmin bean.
* default {@code kafkaAdmin} bean.
* @return the admin properties initialized with the customizations defined on this
* instance
* @deprecated since 3.2.0 for removal in 3.4.0 in favor of
* {@link #buildAdminProperties(SslBundles)}}
*/
@Deprecated(since = "3.2.0", forRemoval = true)
public Map<String, Object> buildAdminProperties() {
return buildAdminProperties(null);
}
/**
* Create an initial map of admin properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary, and override the
* default {@code kafkaAdmin} bean.
* @param sslBundles bundles providing SSL trust material
* @return the admin properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildAdminProperties() {
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.admin.buildProperties());
public Map<String, Object> buildAdminProperties(SslBundles sslBundles) {
Map<String, Object> properties = buildCommonProperties(sslBundles);
properties.putAll(this.admin.buildProperties(sslBundles));
return properties;
}
@ -221,10 +273,25 @@ public class KafkaProperties {
* This allows you to add additional properties, if necessary.
* @return the streams properties initialized with the customizations defined on this
* instance
* @deprecated since 3.2.0 for removal in 3.4.0 in favor of
* {@link #buildStreamsProperties(SslBundles)}}
*/
@Deprecated(since = "3.2.0", forRemoval = true)
public Map<String, Object> buildStreamsProperties() {
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.streams.buildProperties());
return buildStreamsProperties(null);
}
/**
* Create an initial map of streams properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary.
* @param sslBundles bundles providing SSL trust material
* @return the streams properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildStreamsProperties(SslBundles sslBundles) {
Map<String, Object> properties = buildCommonProperties(sslBundles);
properties.putAll(this.streams.buildProperties(sslBundles));
return properties;
}
@ -426,7 +493,7 @@ public class KafkaProperties {
return this.properties;
}
public Map<String, Object> buildProperties() {
public Map<String, Object> buildProperties(SslBundles sslBundles) {
Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getAutoCommitInterval)
@ -451,7 +518,7 @@ public class KafkaProperties {
map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
return properties.with(this.ssl, this.security, this.properties);
return properties.with(this.ssl, this.security, this.properties, sslBundles);
}
}
@ -613,7 +680,7 @@ public class KafkaProperties {
return this.properties;
}
public Map<String, Object> buildProperties() {
public Map<String, Object> buildProperties(SslBundles sslBundles) {
Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getAcks).to(properties.in(ProducerConfig.ACKS_CONFIG));
@ -627,7 +694,7 @@ public class KafkaProperties {
map.from(this::getKeySerializer).to(properties.in(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
map.from(this::getRetries).to(properties.in(ProducerConfig.RETRIES_CONFIG));
map.from(this::getValueSerializer).to(properties.in(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
return properties.with(this.ssl, this.security, this.properties);
return properties.with(this.ssl, this.security, this.properties, sslBundles);
}
}
@ -734,11 +801,11 @@ public class KafkaProperties {
return this.properties;
}
public Map<String, Object> buildProperties() {
public Map<String, Object> buildProperties(SslBundles sslBundles) {
Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getClientId).to(properties.in(ProducerConfig.CLIENT_ID_CONFIG));
return properties.with(this.ssl, this.security, this.properties);
return properties.with(this.ssl, this.security, this.properties, sslBundles);
}
}
@ -885,7 +952,7 @@ public class KafkaProperties {
return this.properties;
}
public Map<String, Object> buildProperties() {
public Map<String, Object> buildProperties(SslBundles sslBundles) {
Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getApplicationId).to(properties.in("application.id"));
@ -899,7 +966,7 @@ public class KafkaProperties {
map.from(this::getClientId).to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG));
map.from(this::getReplicationFactor).to(properties.in("replication.factor"));
map.from(this::getStateDir).to(properties.in("state.dir"));
return properties.with(this.ssl, this.security, this.properties);
return properties.with(this.ssl, this.security, this.properties, sslBundles);
}
}
@ -1198,6 +1265,11 @@ public class KafkaProperties {
public static class Ssl {
/**
* Name of the SSL bundle to use.
*/
private String bundle;
/**
* Password of the private key in either key store key or key store file.
*/
@ -1253,6 +1325,14 @@ public class KafkaProperties {
*/
private String protocol;
public String getBundle() {
return this.bundle;
}
public void setBundle(String bundle) {
this.bundle = bundle;
}
public String getKeyPassword() {
return this.keyPassword;
}
@ -1341,26 +1421,39 @@ public class KafkaProperties {
this.protocol = protocol;
}
@Deprecated(since = "3.2.0", forRemoval = true)
public Map<String, Object> buildProperties() {
return buildProperties(null);
}
public Map<String, Object> buildProperties(SslBundles sslBundles) {
validate();
Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getKeyPassword).to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
map.from(this::getKeyStoreCertificateChain)
.to(properties.in(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG));
map.from(this::getKeyStoreKey).to(properties.in(SslConfigs.SSL_KEYSTORE_KEY_CONFIG));
map.from(this::getKeyStoreLocation)
.as(this::resourceToPath)
.to(properties.in(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
map.from(this::getKeyStorePassword).to(properties.in(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
map.from(this::getKeyStoreType).to(properties.in(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG));
map.from(this::getTrustStoreCertificates).to(properties.in(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG));
map.from(this::getTrustStoreLocation)
.as(this::resourceToPath)
.to(properties.in(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
map.from(this::getTrustStorePassword).to(properties.in(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
map.from(this::getTrustStoreType).to(properties.in(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
map.from(this::getProtocol).to(properties.in(SslConfigs.SSL_PROTOCOL_CONFIG));
if (getBundle() != null) {
properties.in(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG)
.accept(SslBundleSslEngineFactory.class.getName());
properties.in(SslBundle.class.getName()).accept(sslBundles.getBundle(getBundle()));
}
else {
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getKeyPassword).to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
map.from(this::getKeyStoreCertificateChain)
.to(properties.in(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG));
map.from(this::getKeyStoreKey).to(properties.in(SslConfigs.SSL_KEYSTORE_KEY_CONFIG));
map.from(this::getKeyStoreLocation)
.as(this::resourceToPath)
.to(properties.in(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
map.from(this::getKeyStorePassword).to(properties.in(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG));
map.from(this::getKeyStoreType).to(properties.in(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG));
map.from(this::getTrustStoreCertificates)
.to(properties.in(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG));
map.from(this::getTrustStoreLocation)
.as(this::resourceToPath)
.to(properties.in(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
map.from(this::getTrustStorePassword).to(properties.in(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
map.from(this::getTrustStoreType).to(properties.in(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
map.from(this::getProtocol).to(properties.in(SslConfigs.SSL_PROTOCOL_CONFIG));
}
return properties;
}
@ -1373,6 +1466,22 @@ public class KafkaProperties {
entries.put("spring.kafka.ssl.trust-store-certificates", getTrustStoreCertificates());
entries.put("spring.kafka.ssl.trust-store-location", getTrustStoreLocation());
});
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> {
entries.put("spring.kafka.ssl.bundle", getBundle());
entries.put("spring.kafka.ssl.key-store-key", getKeyStoreKey());
});
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> {
entries.put("spring.kafka.ssl.bundle", getBundle());
entries.put("spring.kafka.ssl.key-store-location", getKeyStoreLocation());
});
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> {
entries.put("spring.kafka.ssl.bundle", getBundle());
entries.put("spring.kafka.ssl.trust-store-certificates", getTrustStoreCertificates());
});
MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> {
entries.put("spring.kafka.ssl.bundle", getBundle());
entries.put("spring.kafka.ssl.trust-store-location", getTrustStoreLocation());
});
}
private String resourceToPath(Resource resource) {
@ -1628,8 +1737,8 @@ public class KafkaProperties {
return (value) -> put(key, value);
}
Properties with(Ssl ssl, Security security, Map<String, String> properties) {
putAll(ssl.buildProperties());
Properties with(Ssl ssl, Security security, Map<String, String> properties, SslBundles sslBundles) {
putAll(ssl.buildProperties(sslBundles));
putAll(security.buildProperties());
putAll(properties);
return this;

View File

@ -30,6 +30,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException;
import org.springframework.boot.ssl.SslBundles;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
@ -46,6 +47,7 @@ import org.springframework.kafka.core.CleanupConfig;
* @author Eddú Meléndez
* @author Moritz Halbritter
* @author Andy Wilkinson
* @author Scott Frederick
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(StreamsBuilder.class)
@ -61,8 +63,8 @@ class KafkaStreamsAnnotationDrivenConfiguration {
@ConditionalOnMissingBean
@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment,
KafkaConnectionDetails connectionDetails) {
Map<String, Object> properties = this.properties.buildStreamsProperties();
KafkaConnectionDetails connectionDetails, ObjectProvider<SslBundles> sslBundles) {
Map<String, Object> properties = this.properties.buildStreamsProperties(sslBundles.getIfAvailable());
applyKafkaConnectionDetailsForStreams(properties, connectionDetails);
if (this.properties.getStreams().getApplicationId() == null) {
String applicationName = environment.getProperty("spring.application.name");

View File

@ -0,0 +1,95 @@
/*
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.autoconfigure.kafka;
import java.io.IOException;
import java.security.KeyStore;
import java.util.Map;
import java.util.Set;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import org.apache.kafka.common.security.auth.SslEngineFactory;
import org.springframework.boot.ssl.SslBundle;
/**
* An {@link SslEngineFactory} that configures creates an {@link SSLEngine} from an
* {@link SslBundle}.
*
* @author Andy Wilkinson
* @author Scott Frederick
* @since 3.2.0
*/
public class SslBundleSslEngineFactory implements SslEngineFactory {
private static final String SSL_BUNDLE_CONFIG_NAME = SslBundle.class.getName();
private Map<String, ?> configs;
private volatile SslBundle sslBundle;
@Override
public void configure(Map<String, ?> configs) {
this.configs = configs;
this.sslBundle = (SslBundle) configs.get(SSL_BUNDLE_CONFIG_NAME);
}
@Override
public void close() throws IOException {
}
@Override
public SSLEngine createClientSslEngine(String peerHost, int peerPort, String endpointIdentification) {
SSLEngine sslEngine = this.sslBundle.createSslContext().createSSLEngine(peerHost, peerPort);
sslEngine.setUseClientMode(true);
SSLParameters sslParams = sslEngine.getSSLParameters();
sslParams.setEndpointIdentificationAlgorithm(endpointIdentification);
sslEngine.setSSLParameters(sslParams);
return sslEngine;
}
@Override
public SSLEngine createServerSslEngine(String peerHost, int peerPort) {
SSLEngine sslEngine = this.sslBundle.createSslContext().createSSLEngine(peerHost, peerPort);
sslEngine.setUseClientMode(false);
return sslEngine;
}
@Override
public boolean shouldBeRebuilt(Map<String, Object> nextConfigs) {
return !nextConfigs.equals(this.configs);
}
@Override
public Set<String> reconfigurableConfigs() {
return Set.of(SSL_BUNDLE_CONFIG_NAME);
}
@Override
public KeyStore keystore() {
return this.sslBundle.getStores().getKeyStore();
}
@Override
public KeyStore truststore() {
return this.sslBundle.getStores().getTrustStore();
}
}

View File

@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.springframework.boot.autoconfigure.ssl.SslAutoConfiguration;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
@ -58,6 +59,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* @author Gary Russell
* @author Stephane Nicoll
* @author Tomaz Fernandes
* @author Andy Wilkinson
*/
@DisabledOnOs(OS.WINDOWS)
@EmbeddedKafka(topics = KafkaAutoConfigurationIntegrationTests.TEST_TOPIC)
@ -133,6 +135,7 @@ class KafkaAutoConfigurationIntegrationTests {
private AnnotationConfigApplicationContext doLoad(Class<?>[] configs, String... environment) {
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext();
applicationContext.register(configs);
applicationContext.register(SslAutoConfiguration.class);
applicationContext.register(KafkaAutoConfiguration.class);
TestPropertyValues.of(environment).applyTo(applicationContext);
applicationContext.refresh();

View File

@ -46,6 +46,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.ssl.SslAutoConfiguration;
import org.springframework.boot.test.context.assertj.AssertableApplicationContext;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.boot.test.context.runner.ContextConsumer;
@ -107,11 +108,12 @@ import static org.mockito.Mockito.never;
* @author Moritz Halbritter
* @author Andy Wilkinson
* @author Phillip Webb
* @author Scott Frederick
*/
class KafkaAutoConfigurationTests {
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class));
.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class, SslAutoConfiguration.class));
@Test
void consumerProperties() {

View File

@ -27,6 +27,8 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Cleanup;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
import org.springframework.boot.ssl.DefaultSslBundleRegistry;
import org.springframework.boot.ssl.SslBundle;
import org.springframework.core.io.ClassPathResource;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.KafkaAdmin;
@ -34,16 +36,19 @@ import org.springframework.kafka.listener.ContainerProperties;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.Mockito.mock;
/**
* Tests for {@link KafkaProperties}.
*
* @author Stephane Nicoll
* @author Madhura Bhave
* @author Scott Frederick
*/
class KafkaPropertiesTests {
@SuppressWarnings("rawtypes")
private final SslBundle sslBundle = mock(SslBundle.class);
@Test
void isolationLevelEnumConsistentWithKafkaVersion() {
org.apache.kafka.common.IsolationLevel[] original = org.apache.kafka.common.IsolationLevel.values();
@ -75,20 +80,30 @@ class KafkaPropertiesTests {
properties.getSsl().setKeyStoreKey("-----BEGINkey");
properties.getSsl().setTrustStoreCertificates("-----BEGINtrust");
properties.getSsl().setKeyStoreCertificateChain("-----BEGINchain");
Map<String, Object> consumerProperties = properties.buildConsumerProperties();
Map<String, Object> consumerProperties = properties.buildConsumerProperties(null);
assertThat(consumerProperties).containsEntry(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "-----BEGINkey");
assertThat(consumerProperties).containsEntry(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "-----BEGINtrust");
assertThat(consumerProperties).containsEntry(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG,
"-----BEGINchain");
}
@Test
void sslBundleConfiguration() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
Map<String, Object> consumerProperties = properties
.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle));
assertThat(consumerProperties).containsEntry(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG,
SslBundleSslEngineFactory.class.getName());
}
@Test
void sslPropertiesWhenKeyStoreLocationAndKeySetShouldThrowException() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setKeyStoreKey("-----BEGIN");
properties.getSsl().setKeyStoreLocation(new ClassPathResource("ksLoc"));
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class)
.isThrownBy(properties::buildConsumerProperties);
.isThrownBy(() -> properties.buildConsumerProperties(null));
}
@Test
@ -97,7 +112,43 @@ class KafkaPropertiesTests {
properties.getSsl().setTrustStoreLocation(new ClassPathResource("tsLoc"));
properties.getSsl().setTrustStoreCertificates("-----BEGIN");
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class)
.isThrownBy(properties::buildConsumerProperties);
.isThrownBy(() -> properties.buildConsumerProperties(null));
}
@Test
void sslPropertiesWhenKeyStoreLocationAndBundleSetShouldThrowException() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
properties.getSsl().setKeyStoreLocation(new ClassPathResource("ksLoc"));
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy(
() -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle)));
}
@Test
void sslPropertiesWhenKeyStoreKeyAndBundleSetShouldThrowException() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
properties.getSsl().setKeyStoreKey("-----BEGIN");
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy(
() -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle)));
}
@Test
void sslPropertiesWhenTrustStoreLocationAndBundleSetShouldThrowException() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
properties.getSsl().setTrustStoreLocation(new ClassPathResource("tsLoc"));
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy(
() -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle)));
}
@Test
void sslPropertiesWhenTrustStoreCertificatesAndBundleSetShouldThrowException() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
properties.getSsl().setTrustStoreCertificates("-----BEGIN");
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy(
() -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle)));
}
@Test