Support arbitrary Kafka properties

Add support for arbitrary Kafka properties via
`spring.kafka.properties.*` and also a `spring.kafka.max.poll.records`
property.

See gh-7672
This commit is contained in:
Gary Russell 2016-12-16 12:23:06 -05:00 committed by Phillip Webb
parent ef671e79e0
commit bdda470305
5 changed files with 179 additions and 11 deletions

View File

@ -71,6 +71,11 @@ public class KafkaProperties {
*/
private String clientId;
/**
* Additional properties used to configure the client.
*/
private Map<String, String> properties = new HashMap<String, String>();
public Consumer getConsumer() {
return this.consumer;
}
@ -107,6 +112,14 @@ public class KafkaProperties {
this.clientId = clientId;
}
public Map<String, String> getProperties() {
return this.properties;
}
public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
private Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.bootstrapServers != null) {
@ -135,6 +148,9 @@ public class KafkaProperties {
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
this.ssl.getTruststorePassword());
}
if (this.properties != null && this.properties.size() > 0) {
properties.putAll(this.properties);
}
return properties;
}
@ -240,6 +256,11 @@ public class KafkaProperties {
*/
private Class<?> valueDeserializer = StringDeserializer.class;
/**
* Maximum number of records returned in a single call to poll().
*/
private Integer maxPollRecords;
public Ssl getSsl() {
return this.ssl;
}
@ -332,6 +353,14 @@ public class KafkaProperties {
this.valueDeserializer = valueDeserializer;
}
public Integer getMaxPollRecords() {
return this.maxPollRecords;
}
public void setMaxPollRecords(Integer maxPollRecords) {
this.maxPollRecords = maxPollRecords;
}
public Map<String, Object> buildProperties() {
Map<String, Object> properties = new HashMap<String, Object>();
if (this.autoCommitInterval != null) {
@ -395,6 +424,9 @@ public class KafkaProperties {
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
this.valueDeserializer);
}
if (this.maxPollRecords != null) {
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
}
return properties;
}

View File

@ -61,12 +61,16 @@ public class KafkaAutoConfigurationTests {
@Test
public void consumerProperties() {
load("spring.kafka.bootstrap-servers=foo:1234",
"spring.kafka.properties.foo=bar",
"spring.kafka.properties.baz=qux",
"spring.kafka.properties.foo.bar.baz=qux.fiz.buz",
"spring.kafka.ssl.key-password=p1",
"spring.kafka.ssl.keystore-location=classpath:ksLoc",
"spring.kafka.ssl.keystore-password=p2",
"spring.kafka.ssl.truststore-location=classpath:tsLoc",
"spring.kafka.ssl.truststore-password=p3",
"spring.kafka.consumer.auto-commit-interval=123",
"spring.kafka.consumer.max-poll-records=42",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.client-id=ccid", // test override common
"spring.kafka.consumer.enable-auto-commit=false",
@ -109,6 +113,11 @@ public class KafkaAutoConfigurationTests {
.isEqualTo(LongDeserializer.class);
assertThat(configs.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG))
.isEqualTo(IntegerDeserializer.class);
assertThat(configs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG))
.isEqualTo(42);
assertThat(configs.get("foo")).isEqualTo("bar");
assertThat(configs.get("baz")).isEqualTo("qux");
assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz");
}
@Test

View File

@ -878,6 +878,7 @@ content into your application; rather pick only the properties that you need.
spring.kafka.consumer.group-id= # Unique string that identifies the consumer group this consumer belongs to.
spring.kafka.consumer.heartbeat-interval= # Expected time in milliseconds between heartbeats to the consumer coordinator.
spring.kafka.consumer.key-deserializer= # Deserializer class for keys.
spring.kafka.consumer.max-poll-messages= # Maximum number of records returned in a single call to poll().
spring.kafka.consumer.value-deserializer= # Deserializer class for values.
spring.kafka.listener.ack-count= # Number of records between offset commits when ackMode is "COUNT" or "COUNT_TIME".
spring.kafka.listener.ack-mode= # Listener AckMode; see the spring-kafka documentation.
@ -893,6 +894,7 @@ content into your application; rather pick only the properties that you need.
spring.kafka.producer.key-serializer= # Serializer class for keys.
spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends.
spring.kafka.producer.value-serializer= # Serializer class for values.
spring.kafka.properties.*= # Additional properties used to configure the client.
spring.kafka.ssl.key-password= # Password of the private key in the key store file.
spring.kafka.ssl.keystore-location= # Location of the key store file.
spring.kafka.ssl.keystore-password= # Store password for the key store file.

View File

@ -4643,22 +4643,22 @@ auto configuration supports all HIGH importance properties, some selected MEDIUM
and any that do not have a default value.
Only a subset of the properties supported by Kafka are available via the `KafkaProperties`
class. If you wish to configure the producer or consumer with additional properties, you
can override the producer factory and/or consumer factory bean, adding additional
properties, for example:
class. If you wish to configure the producer or consumer with additional properties that
are not directly supported, use the following:
`spring.kafka.properties.foo.bar=baz`
This sets the common `foo.bar` kafka property to `baz`.
These properties will be shared by both the consumer and producer factory beans.
If you wish to customize these components with different properties, such as to use a
different metrics reader for each, you can override the bean definitions, as follows:
[source,java,indent=0]
----
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put("some.property", "some.value");
return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
}
include::{code-examples}/kafka/KafkaSpecialProducerConsumerConfigExample.java[tag=configuration]
----
[[boot-features-restclient]]
== Calling REST services
If you need to call remote REST services from your application, you can use Spring

View File

@ -0,0 +1,125 @@
/*
* Copyright 2016-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.boot.kafka;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.ProducerFactory;
/**
* Example custom kafka configuration beans used when the user wants to
* apply different common properties to the producer and consumer.
*
* @author Gary Russell
* @since 1.5
*
*/
public class KafkaSpecialProducerConsumerConfigExample {
// tag::configuration[]
@Configuration
public static class CustomKafkaBeans {
/**
* Customized ProducerFactory bean.
* @param properties the kafka properties.
* @return the bean.
*/
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
MyProducerMetricsReporter.class);
return new DefaultKafkaProducerFactory<Object, Object>(producerProperties);
}
/**
* Customized ConsumerFactory bean.
* @param properties the kafka properties.
* @return the bean.
*/
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> consumererProperties = properties.buildConsumerProperties();
consumererProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
MyConsumerMetricsReporter.class);
return new DefaultKafkaConsumerFactory<Object, Object>(consumererProperties);
}
}
// end::configuration[]
public static class MyConsumerMetricsReporter implements MetricsReporter {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public void init(List<KafkaMetric> metrics) {
}
@Override
public void metricChange(KafkaMetric metric) {
}
@Override
public void metricRemoval(KafkaMetric metric) {
}
@Override
public void close() {
}
}
public static class MyProducerMetricsReporter implements MetricsReporter {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public void init(List<KafkaMetric> metrics) {
}
@Override
public void metricChange(KafkaMetric metric) {
}
@Override
public void metricRemoval(KafkaMetric metric) {
}
@Override
public void close() {
}
}
}