From ec6415f04bf3e3ec47ab9600c744e5199ca85be0 Mon Sep 17 00:00:00 2001 From: Andy Wilkinson Date: Wed, 18 Oct 2023 16:29:29 -0500 Subject: [PATCH] Add SSL bundle support to Apache Kafka auto-configuration Closes gh-37629 Co-authored-by: Scott Frederick --- .../KafkaAnnotationDrivenConfiguration.java | 10 +- .../kafka/KafkaAutoConfiguration.java | 15 +- .../autoconfigure/kafka/KafkaProperties.java | 195 ++++++++++++++---- ...aStreamsAnnotationDrivenConfiguration.java | 6 +- .../kafka/SslBundleSslEngineFactory.java | 95 +++++++++ ...afkaAutoConfigurationIntegrationTests.java | 3 + .../kafka/KafkaAutoConfigurationTests.java | 4 +- .../kafka/KafkaPropertiesTests.java | 59 +++++- 8 files changed, 328 insertions(+), 59 deletions(-) create mode 100644 spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/SslBundleSslEngineFactory.java diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index f7a103305f0..abbc834f466 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -23,6 +23,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnThreading; import org.springframework.boot.autoconfigure.thread.Threading; +import org.springframework.boot.ssl.SslBundles; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.SimpleAsyncTaskExecutor; @@ -53,6 +54,8 @@ import org.springframework.kafka.transaction.KafkaAwareTransactionManager; * @author Eddú Meléndez * @author Thomas Kåsene * @author Moritz Halbritter + * @author Andy Wilkinson + * @author Scott Frederick */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(EnableKafka.class) @@ -149,10 +152,11 @@ class KafkaAnnotationDrivenConfiguration { ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ObjectProvider> kafkaConsumerFactory, - ObjectProvider>> kafkaContainerCustomizer) { + ObjectProvider>> kafkaContainerCustomizer, + ObjectProvider sslBundles) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); - configurer.configure(factory, kafkaConsumerFactory - .getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties()))); + configurer.configure(factory, kafkaConsumerFactory.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>( + this.properties.buildConsumerProperties(sslBundles.getIfAvailable())))); kafkaContainerCustomizer.ifAvailable(factory::setContainerCustomizer); return factory; } diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index 9d73f58cd56..9e2b2f22c8c 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -35,6 +35,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Retry.Topic; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.context.properties.PropertyMapper; +import org.springframework.boot.ssl.SslBundles; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.core.ConsumerFactory; @@ -64,6 +65,8 @@ import org.springframework.retry.backoff.SleepingBackOffPolicy; * @author Moritz Halbritter * @author Andy Wilkinson * @author Phillip Webb + * @author Andy Wilkinson + * @author Scott Frederick * @since 1.5.0 */ @AutoConfiguration @@ -107,8 +110,8 @@ public class KafkaAutoConfiguration { @Bean @ConditionalOnMissingBean(ConsumerFactory.class) public DefaultKafkaConsumerFactory kafkaConsumerFactory(KafkaConnectionDetails connectionDetails, - ObjectProvider customizers) { - Map properties = this.properties.buildConsumerProperties(); + ObjectProvider customizers, ObjectProvider sslBundles) { + Map properties = this.properties.buildConsumerProperties(sslBundles.getIfAvailable()); applyKafkaConnectionDetailsForConsumer(properties, connectionDetails); DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory<>(properties); customizers.orderedStream().forEach((customizer) -> customizer.customize(factory)); @@ -118,8 +121,8 @@ public class KafkaAutoConfiguration { @Bean @ConditionalOnMissingBean(ProducerFactory.class) public DefaultKafkaProducerFactory kafkaProducerFactory(KafkaConnectionDetails connectionDetails, - ObjectProvider customizers) { - Map properties = this.properties.buildProducerProperties(); + ObjectProvider customizers, ObjectProvider sslBundles) { + Map properties = this.properties.buildProducerProperties(sslBundles.getIfAvailable()); applyKafkaConnectionDetailsForProducer(properties, connectionDetails); DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(properties); String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix(); @@ -155,8 +158,8 @@ public class KafkaAutoConfiguration { @Bean @ConditionalOnMissingBean - public KafkaAdmin kafkaAdmin(KafkaConnectionDetails connectionDetails) { - Map properties = this.properties.buildAdminProperties(); + public KafkaAdmin kafkaAdmin(KafkaConnectionDetails connectionDetails, ObjectProvider sslBundles) { + Map properties = this.properties.buildAdminProperties(sslBundles.getIfAvailable()); applyKafkaConnectionDetailsForAdmin(properties, connectionDetails); KafkaAdmin kafkaAdmin = new KafkaAdmin(properties); KafkaProperties.Admin admin = this.properties.getAdmin(); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 7227df43f23..a7da43eea30 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -38,6 +38,8 @@ import org.springframework.boot.context.properties.DeprecatedConfigurationProper import org.springframework.boot.context.properties.PropertyMapper; import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException; import org.springframework.boot.convert.DurationUnit; +import org.springframework.boot.ssl.SslBundle; +import org.springframework.boot.ssl.SslBundles; import org.springframework.core.io.Resource; import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer; @@ -55,6 +57,8 @@ import org.springframework.util.unit.DataSize; * @author Artem Bilan * @author Nakul Mishra * @author Tomaz Fernandes + * @author Andy Wilkinson + * @author Scott Frederick * @since 1.5.0 */ @ConfigurationProperties(prefix = "spring.kafka") @@ -157,7 +161,7 @@ public class KafkaProperties { return this.retry; } - private Map buildCommonProperties() { + private Map buildCommonProperties(SslBundles sslBundles) { Map properties = new HashMap<>(); if (this.bootstrapServers != null) { properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers); @@ -165,7 +169,7 @@ public class KafkaProperties { if (this.clientId != null) { properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId); } - properties.putAll(this.ssl.buildProperties()); + properties.putAll(this.ssl.buildProperties(sslBundles)); properties.putAll(this.security.buildProperties()); if (!CollectionUtils.isEmpty(this.properties)) { properties.putAll(this.properties); @@ -177,13 +181,29 @@ public class KafkaProperties { * Create an initial map of consumer properties from the state of this instance. *

* This allows you to add additional properties, if necessary, and override the - * default kafkaConsumerFactory bean. + * default {@code kafkaConsumerFactory} bean. + * @return the consumer properties initialized with the customizations defined on this + * instance + * @deprecated since 3.2.0 for removal in 3.4.0 in favor of + * {@link #buildConsumerProperties(SslBundles)}} + */ + @Deprecated(since = "3.2.0", forRemoval = true) + public Map buildConsumerProperties() { + return buildConsumerProperties(null); + } + + /** + * Create an initial map of consumer properties from the state of this instance. + *

+ * This allows you to add additional properties, if necessary, and override the + * default {@code kafkaConsumerFactory} bean. + * @param sslBundles bundles providing SSL trust material * @return the consumer properties initialized with the customizations defined on this * instance */ - public Map buildConsumerProperties() { - Map properties = buildCommonProperties(); - properties.putAll(this.consumer.buildProperties()); + public Map buildConsumerProperties(SslBundles sslBundles) { + Map properties = buildCommonProperties(sslBundles); + properties.putAll(this.consumer.buildProperties(sslBundles)); return properties; } @@ -191,13 +211,29 @@ public class KafkaProperties { * Create an initial map of producer properties from the state of this instance. *

* This allows you to add additional properties, if necessary, and override the - * default kafkaProducerFactory bean. + * default {@code kafkaProducerFactory} bean. + * @return the producer properties initialized with the customizations defined on this + * instance + * @deprecated since 3.2.0 for removal in 3.4.0 in favor of + * {@link #buildProducerProperties(SslBundles)}} + */ + @Deprecated(since = "3.2.0", forRemoval = true) + public Map buildProducerProperties() { + return buildProducerProperties(null); + } + + /** + * Create an initial map of producer properties from the state of this instance. + *

+ * This allows you to add additional properties, if necessary, and override the + * default {@code kafkaProducerFactory} bean. + * @param sslBundles bundles providing SSL trust material * @return the producer properties initialized with the customizations defined on this * instance */ - public Map buildProducerProperties() { - Map properties = buildCommonProperties(); - properties.putAll(this.producer.buildProperties()); + public Map buildProducerProperties(SslBundles sslBundles) { + Map properties = buildCommonProperties(sslBundles); + properties.putAll(this.producer.buildProperties(sslBundles)); return properties; } @@ -205,13 +241,29 @@ public class KafkaProperties { * Create an initial map of admin properties from the state of this instance. *

* This allows you to add additional properties, if necessary, and override the - * default kafkaAdmin bean. + * default {@code kafkaAdmin} bean. + * @return the admin properties initialized with the customizations defined on this + * instance + * @deprecated since 3.2.0 for removal in 3.4.0 in favor of + * {@link #buildAdminProperties(SslBundles)}} + */ + @Deprecated(since = "3.2.0", forRemoval = true) + public Map buildAdminProperties() { + return buildAdminProperties(null); + } + + /** + * Create an initial map of admin properties from the state of this instance. + *

+ * This allows you to add additional properties, if necessary, and override the + * default {@code kafkaAdmin} bean. + * @param sslBundles bundles providing SSL trust material * @return the admin properties initialized with the customizations defined on this * instance */ - public Map buildAdminProperties() { - Map properties = buildCommonProperties(); - properties.putAll(this.admin.buildProperties()); + public Map buildAdminProperties(SslBundles sslBundles) { + Map properties = buildCommonProperties(sslBundles); + properties.putAll(this.admin.buildProperties(sslBundles)); return properties; } @@ -221,10 +273,25 @@ public class KafkaProperties { * This allows you to add additional properties, if necessary. * @return the streams properties initialized with the customizations defined on this * instance + * @deprecated since 3.2.0 for removal in 3.4.0 in favor of + * {@link #buildStreamsProperties(SslBundles)}} */ + @Deprecated(since = "3.2.0", forRemoval = true) public Map buildStreamsProperties() { - Map properties = buildCommonProperties(); - properties.putAll(this.streams.buildProperties()); + return buildStreamsProperties(null); + } + + /** + * Create an initial map of streams properties from the state of this instance. + *

+ * This allows you to add additional properties, if necessary. + * @param sslBundles bundles providing SSL trust material + * @return the streams properties initialized with the customizations defined on this + * instance + */ + public Map buildStreamsProperties(SslBundles sslBundles) { + Map properties = buildCommonProperties(sslBundles); + properties.putAll(this.streams.buildProperties(sslBundles)); return properties; } @@ -426,7 +493,7 @@ public class KafkaProperties { return this.properties; } - public Map buildProperties() { + public Map buildProperties(SslBundles sslBundles) { Properties properties = new Properties(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(this::getAutoCommitInterval) @@ -451,7 +518,7 @@ public class KafkaProperties { map.from(this::getKeyDeserializer).to(properties.in(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); map.from(this::getValueDeserializer).to(properties.in(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); map.from(this::getMaxPollRecords).to(properties.in(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)); - return properties.with(this.ssl, this.security, this.properties); + return properties.with(this.ssl, this.security, this.properties, sslBundles); } } @@ -613,7 +680,7 @@ public class KafkaProperties { return this.properties; } - public Map buildProperties() { + public Map buildProperties(SslBundles sslBundles) { Properties properties = new Properties(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(this::getAcks).to(properties.in(ProducerConfig.ACKS_CONFIG)); @@ -627,7 +694,7 @@ public class KafkaProperties { map.from(this::getKeySerializer).to(properties.in(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); map.from(this::getRetries).to(properties.in(ProducerConfig.RETRIES_CONFIG)); map.from(this::getValueSerializer).to(properties.in(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); - return properties.with(this.ssl, this.security, this.properties); + return properties.with(this.ssl, this.security, this.properties, sslBundles); } } @@ -734,11 +801,11 @@ public class KafkaProperties { return this.properties; } - public Map buildProperties() { + public Map buildProperties(SslBundles sslBundles) { Properties properties = new Properties(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(this::getClientId).to(properties.in(ProducerConfig.CLIENT_ID_CONFIG)); - return properties.with(this.ssl, this.security, this.properties); + return properties.with(this.ssl, this.security, this.properties, sslBundles); } } @@ -885,7 +952,7 @@ public class KafkaProperties { return this.properties; } - public Map buildProperties() { + public Map buildProperties(SslBundles sslBundles) { Properties properties = new Properties(); PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); map.from(this::getApplicationId).to(properties.in("application.id")); @@ -899,7 +966,7 @@ public class KafkaProperties { map.from(this::getClientId).to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG)); map.from(this::getReplicationFactor).to(properties.in("replication.factor")); map.from(this::getStateDir).to(properties.in("state.dir")); - return properties.with(this.ssl, this.security, this.properties); + return properties.with(this.ssl, this.security, this.properties, sslBundles); } } @@ -1198,6 +1265,11 @@ public class KafkaProperties { public static class Ssl { + /** + * Name of the SSL bundle to use. + */ + private String bundle; + /** * Password of the private key in either key store key or key store file. */ @@ -1253,6 +1325,14 @@ public class KafkaProperties { */ private String protocol; + public String getBundle() { + return this.bundle; + } + + public void setBundle(String bundle) { + this.bundle = bundle; + } + public String getKeyPassword() { return this.keyPassword; } @@ -1341,26 +1421,39 @@ public class KafkaProperties { this.protocol = protocol; } + @Deprecated(since = "3.2.0", forRemoval = true) public Map buildProperties() { + return buildProperties(null); + } + + public Map buildProperties(SslBundles sslBundles) { validate(); Properties properties = new Properties(); - PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); - map.from(this::getKeyPassword).to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG)); - map.from(this::getKeyStoreCertificateChain) - .to(properties.in(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG)); - map.from(this::getKeyStoreKey).to(properties.in(SslConfigs.SSL_KEYSTORE_KEY_CONFIG)); - map.from(this::getKeyStoreLocation) - .as(this::resourceToPath) - .to(properties.in(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)); - map.from(this::getKeyStorePassword).to(properties.in(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)); - map.from(this::getKeyStoreType).to(properties.in(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)); - map.from(this::getTrustStoreCertificates).to(properties.in(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG)); - map.from(this::getTrustStoreLocation) - .as(this::resourceToPath) - .to(properties.in(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); - map.from(this::getTrustStorePassword).to(properties.in(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); - map.from(this::getTrustStoreType).to(properties.in(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); - map.from(this::getProtocol).to(properties.in(SslConfigs.SSL_PROTOCOL_CONFIG)); + if (getBundle() != null) { + properties.in(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG) + .accept(SslBundleSslEngineFactory.class.getName()); + properties.in(SslBundle.class.getName()).accept(sslBundles.getBundle(getBundle())); + } + else { + PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); + map.from(this::getKeyPassword).to(properties.in(SslConfigs.SSL_KEY_PASSWORD_CONFIG)); + map.from(this::getKeyStoreCertificateChain) + .to(properties.in(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG)); + map.from(this::getKeyStoreKey).to(properties.in(SslConfigs.SSL_KEYSTORE_KEY_CONFIG)); + map.from(this::getKeyStoreLocation) + .as(this::resourceToPath) + .to(properties.in(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG)); + map.from(this::getKeyStorePassword).to(properties.in(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG)); + map.from(this::getKeyStoreType).to(properties.in(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG)); + map.from(this::getTrustStoreCertificates) + .to(properties.in(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG)); + map.from(this::getTrustStoreLocation) + .as(this::resourceToPath) + .to(properties.in(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); + map.from(this::getTrustStorePassword).to(properties.in(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); + map.from(this::getTrustStoreType).to(properties.in(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); + map.from(this::getProtocol).to(properties.in(SslConfigs.SSL_PROTOCOL_CONFIG)); + } return properties; } @@ -1373,6 +1466,22 @@ public class KafkaProperties { entries.put("spring.kafka.ssl.trust-store-certificates", getTrustStoreCertificates()); entries.put("spring.kafka.ssl.trust-store-location", getTrustStoreLocation()); }); + MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> { + entries.put("spring.kafka.ssl.bundle", getBundle()); + entries.put("spring.kafka.ssl.key-store-key", getKeyStoreKey()); + }); + MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> { + entries.put("spring.kafka.ssl.bundle", getBundle()); + entries.put("spring.kafka.ssl.key-store-location", getKeyStoreLocation()); + }); + MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> { + entries.put("spring.kafka.ssl.bundle", getBundle()); + entries.put("spring.kafka.ssl.trust-store-certificates", getTrustStoreCertificates()); + }); + MutuallyExclusiveConfigurationPropertiesException.throwIfMultipleNonNullValuesIn((entries) -> { + entries.put("spring.kafka.ssl.bundle", getBundle()); + entries.put("spring.kafka.ssl.trust-store-location", getTrustStoreLocation()); + }); } private String resourceToPath(Resource resource) { @@ -1628,8 +1737,8 @@ public class KafkaProperties { return (value) -> put(key, value); } - Properties with(Ssl ssl, Security security, Map properties) { - putAll(ssl.buildProperties()); + Properties with(Ssl ssl, Security security, Map properties, SslBundles sslBundles) { + putAll(ssl.buildProperties(sslBundles)); putAll(security.buildProperties()); putAll(properties); return this; diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java index 28e35d3699f..70138432630 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaStreamsAnnotationDrivenConfiguration.java @@ -30,6 +30,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.source.InvalidConfigurationPropertyValueException; +import org.springframework.boot.ssl.SslBundles; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; @@ -46,6 +47,7 @@ import org.springframework.kafka.core.CleanupConfig; * @author Eddú Meléndez * @author Moritz Halbritter * @author Andy Wilkinson + * @author Scott Frederick */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(StreamsBuilder.class) @@ -61,8 +63,8 @@ class KafkaStreamsAnnotationDrivenConfiguration { @ConditionalOnMissingBean @Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment, - KafkaConnectionDetails connectionDetails) { - Map properties = this.properties.buildStreamsProperties(); + KafkaConnectionDetails connectionDetails, ObjectProvider sslBundles) { + Map properties = this.properties.buildStreamsProperties(sslBundles.getIfAvailable()); applyKafkaConnectionDetailsForStreams(properties, connectionDetails); if (this.properties.getStreams().getApplicationId() == null) { String applicationName = environment.getProperty("spring.application.name"); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/SslBundleSslEngineFactory.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/SslBundleSslEngineFactory.java new file mode 100644 index 00000000000..5c5e93ebf56 --- /dev/null +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/SslBundleSslEngineFactory.java @@ -0,0 +1,95 @@ +/* + * Copyright 2012-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.autoconfigure.kafka; + +import java.io.IOException; +import java.security.KeyStore; +import java.util.Map; +import java.util.Set; + +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; + +import org.apache.kafka.common.security.auth.SslEngineFactory; + +import org.springframework.boot.ssl.SslBundle; + +/** + * An {@link SslEngineFactory} that configures creates an {@link SSLEngine} from an + * {@link SslBundle}. + * + * @author Andy Wilkinson + * @author Scott Frederick + * @since 3.2.0 + */ +public class SslBundleSslEngineFactory implements SslEngineFactory { + + private static final String SSL_BUNDLE_CONFIG_NAME = SslBundle.class.getName(); + + private Map configs; + + private volatile SslBundle sslBundle; + + @Override + public void configure(Map configs) { + this.configs = configs; + this.sslBundle = (SslBundle) configs.get(SSL_BUNDLE_CONFIG_NAME); + } + + @Override + public void close() throws IOException { + + } + + @Override + public SSLEngine createClientSslEngine(String peerHost, int peerPort, String endpointIdentification) { + SSLEngine sslEngine = this.sslBundle.createSslContext().createSSLEngine(peerHost, peerPort); + sslEngine.setUseClientMode(true); + SSLParameters sslParams = sslEngine.getSSLParameters(); + sslParams.setEndpointIdentificationAlgorithm(endpointIdentification); + sslEngine.setSSLParameters(sslParams); + return sslEngine; + } + + @Override + public SSLEngine createServerSslEngine(String peerHost, int peerPort) { + SSLEngine sslEngine = this.sslBundle.createSslContext().createSSLEngine(peerHost, peerPort); + sslEngine.setUseClientMode(false); + return sslEngine; + } + + @Override + public boolean shouldBeRebuilt(Map nextConfigs) { + return !nextConfigs.equals(this.configs); + } + + @Override + public Set reconfigurableConfigs() { + return Set.of(SSL_BUNDLE_CONFIG_NAME); + } + + @Override + public KeyStore keystore() { + return this.sslBundle.getStores().getKeyStore(); + } + + @Override + public KeyStore truststore() { + return this.sslBundle.getStores().getTrustStore(); + } + +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java index 2b6d6e849a1..8b0668ffe8d 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationIntegrationTests.java @@ -33,6 +33,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; +import org.springframework.boot.autoconfigure.ssl.SslAutoConfiguration; import org.springframework.boot.test.util.TestPropertyValues; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; @@ -58,6 +59,7 @@ import static org.assertj.core.api.Assertions.assertThat; * @author Gary Russell * @author Stephane Nicoll * @author Tomaz Fernandes + * @author Andy Wilkinson */ @DisabledOnOs(OS.WINDOWS) @EmbeddedKafka(topics = KafkaAutoConfigurationIntegrationTests.TEST_TOPIC) @@ -133,6 +135,7 @@ class KafkaAutoConfigurationIntegrationTests { private AnnotationConfigApplicationContext doLoad(Class[] configs, String... environment) { AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(); applicationContext.register(configs); + applicationContext.register(SslAutoConfiguration.class); applicationContext.register(KafkaAutoConfiguration.class); TestPropertyValues.of(environment).applyTo(applicationContext); applicationContext.refresh(); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index 516db167450..2b8c48d3e02 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -46,6 +46,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.autoconfigure.ssl.SslAutoConfiguration; import org.springframework.boot.test.context.assertj.AssertableApplicationContext; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.boot.test.context.runner.ContextConsumer; @@ -107,11 +108,12 @@ import static org.mockito.Mockito.never; * @author Moritz Halbritter * @author Andy Wilkinson * @author Phillip Webb + * @author Scott Frederick */ class KafkaAutoConfigurationTests { private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() - .withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class)); + .withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class, SslAutoConfiguration.class)); @Test void consumerProperties() { diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java index 8ee0486d857..dbd53fd59d8 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaPropertiesTests.java @@ -27,6 +27,8 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Cleanup; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel; import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener; import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException; +import org.springframework.boot.ssl.DefaultSslBundleRegistry; +import org.springframework.boot.ssl.SslBundle; import org.springframework.core.io.ClassPathResource; import org.springframework.kafka.core.CleanupConfig; import org.springframework.kafka.core.KafkaAdmin; @@ -34,16 +36,19 @@ import org.springframework.kafka.listener.ContainerProperties; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mock; /** * Tests for {@link KafkaProperties}. * * @author Stephane Nicoll * @author Madhura Bhave + * @author Scott Frederick */ class KafkaPropertiesTests { - @SuppressWarnings("rawtypes") + private final SslBundle sslBundle = mock(SslBundle.class); + @Test void isolationLevelEnumConsistentWithKafkaVersion() { org.apache.kafka.common.IsolationLevel[] original = org.apache.kafka.common.IsolationLevel.values(); @@ -75,20 +80,30 @@ class KafkaPropertiesTests { properties.getSsl().setKeyStoreKey("-----BEGINkey"); properties.getSsl().setTrustStoreCertificates("-----BEGINtrust"); properties.getSsl().setKeyStoreCertificateChain("-----BEGINchain"); - Map consumerProperties = properties.buildConsumerProperties(); + Map consumerProperties = properties.buildConsumerProperties(null); assertThat(consumerProperties).containsEntry(SslConfigs.SSL_KEYSTORE_KEY_CONFIG, "-----BEGINkey"); assertThat(consumerProperties).containsEntry(SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG, "-----BEGINtrust"); assertThat(consumerProperties).containsEntry(SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG, "-----BEGINchain"); } + @Test + void sslBundleConfiguration() { + KafkaProperties properties = new KafkaProperties(); + properties.getSsl().setBundle("myBundle"); + Map consumerProperties = properties + .buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle)); + assertThat(consumerProperties).containsEntry(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, + SslBundleSslEngineFactory.class.getName()); + } + @Test void sslPropertiesWhenKeyStoreLocationAndKeySetShouldThrowException() { KafkaProperties properties = new KafkaProperties(); properties.getSsl().setKeyStoreKey("-----BEGIN"); properties.getSsl().setKeyStoreLocation(new ClassPathResource("ksLoc")); assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class) - .isThrownBy(properties::buildConsumerProperties); + .isThrownBy(() -> properties.buildConsumerProperties(null)); } @Test @@ -97,7 +112,43 @@ class KafkaPropertiesTests { properties.getSsl().setTrustStoreLocation(new ClassPathResource("tsLoc")); properties.getSsl().setTrustStoreCertificates("-----BEGIN"); assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class) - .isThrownBy(properties::buildConsumerProperties); + .isThrownBy(() -> properties.buildConsumerProperties(null)); + } + + @Test + void sslPropertiesWhenKeyStoreLocationAndBundleSetShouldThrowException() { + KafkaProperties properties = new KafkaProperties(); + properties.getSsl().setBundle("myBundle"); + properties.getSsl().setKeyStoreLocation(new ClassPathResource("ksLoc")); + assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy( + () -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle))); + } + + @Test + void sslPropertiesWhenKeyStoreKeyAndBundleSetShouldThrowException() { + KafkaProperties properties = new KafkaProperties(); + properties.getSsl().setBundle("myBundle"); + properties.getSsl().setKeyStoreKey("-----BEGIN"); + assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy( + () -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle))); + } + + @Test + void sslPropertiesWhenTrustStoreLocationAndBundleSetShouldThrowException() { + KafkaProperties properties = new KafkaProperties(); + properties.getSsl().setBundle("myBundle"); + properties.getSsl().setTrustStoreLocation(new ClassPathResource("tsLoc")); + assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy( + () -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle))); + } + + @Test + void sslPropertiesWhenTrustStoreCertificatesAndBundleSetShouldThrowException() { + KafkaProperties properties = new KafkaProperties(); + properties.getSsl().setBundle("myBundle"); + properties.getSsl().setTrustStoreCertificates("-----BEGIN"); + assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy( + () -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle))); } @Test