From 89e1d9536345275d78ce4176cc8e25b36a28b035 Mon Sep 17 00:00:00 2001 From: xinhc Date: Thu, 11 Jan 2018 11:00:00 +0800 Subject: [PATCH 1/2] Add Kafka sample See gh-11597 --- spring-boot-samples/README.adoc | 3 + spring-boot-samples/pom.xml | 1 + .../spring-boot-sample-kafka/pom.xml | 53 ++++++++++++++++ .../src/main/java/sample/kafka/Consumer.java | 28 +++++++++ .../src/main/java/sample/kafka/Producer.java | 32 ++++++++++ .../sample/kafka/SampleKafkaApplication.java | 27 ++++++++ .../main/java/sample/kafka/SampleMessage.java | 53 ++++++++++++++++ .../src/main/resources/application.properties | 5 ++ .../kafka/SampleKafkaApplicationTests.java | 61 +++++++++++++++++++ 9 files changed, 263 insertions(+) create mode 100644 spring-boot-samples/spring-boot-sample-kafka/pom.xml create mode 100644 spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Consumer.java create mode 100644 spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Producer.java create mode 100644 spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleKafkaApplication.java create mode 100644 spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleMessage.java create mode 100644 spring-boot-samples/spring-boot-sample-kafka/src/main/resources/application.properties create mode 100644 spring-boot-samples/spring-boot-sample-kafka/src/test/java/sample/kafka/SampleKafkaApplicationTests.java diff --git a/spring-boot-samples/README.adoc b/spring-boot-samples/README.adoc index ab6b03a7348..0e48ede4f86 100644 --- a/spring-boot-samples/README.adoc +++ b/spring-boot-samples/README.adoc @@ -230,3 +230,6 @@ The following sample applications are provided: | link:spring-boot-sample-xml[spring-boot-sample-xml] | Example show how Spring Boot can be mixed with traditional XML configuration (we generally recommend using Java `@Configuration` whenever possible + +| link:spring-boot-sample-kafka[spring-boot-sample-kafka] +| consumer and producer using Apache Kafka diff --git a/spring-boot-samples/pom.xml b/spring-boot-samples/pom.xml index 51f5dbed353..83792c9881b 100644 --- a/spring-boot-samples/pom.xml +++ b/spring-boot-samples/pom.xml @@ -97,6 +97,7 @@ spring-boot-sample-websocket-undertow spring-boot-sample-webservices spring-boot-sample-xml + spring-boot-sample-kafka diff --git a/spring-boot-samples/spring-boot-sample-kafka/pom.xml b/spring-boot-samples/spring-boot-sample-kafka/pom.xml new file mode 100644 index 00000000000..901631faa6e --- /dev/null +++ b/spring-boot-samples/spring-boot-sample-kafka/pom.xml @@ -0,0 +1,53 @@ + + + 4.0.0 + + + spring-boot-samples + org.springframework.boot + ${revision} + + spring-boot-sample-kafka + Spring Boot Kafka Sample + Spring Boot Kafka Sample + + + ${basedir}/../.. + + + + + + org.springframework.boot + spring-boot-starter + + + org.springframework.kafka + spring-kafka + + + org.springframework.boot + spring-boot-starter-json + + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.kafka + spring-kafka-test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Consumer.java b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Consumer.java new file mode 100644 index 00000000000..4e7e2a8b3d3 --- /dev/null +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Consumer.java @@ -0,0 +1,28 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sample.kafka; + +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +public class Consumer { + + @KafkaListener(topics = "myTopic") + public void processMessage(SampleMessage message) { + System.out.println("consumer has received message : [" + message + "]"); + } +} \ No newline at end of file diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Producer.java b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Producer.java new file mode 100644 index 00000000000..89c50e3533a --- /dev/null +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Producer.java @@ -0,0 +1,32 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sample.kafka; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Component +public class Producer { + + @Autowired + private KafkaTemplate kafkaTemplate; + + public void send(SampleMessage message) { + kafkaTemplate.send("myTopic", message); + System.out.println("producer has sent message."); + } +} \ No newline at end of file diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleKafkaApplication.java b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleKafkaApplication.java new file mode 100644 index 00000000000..9769c06acea --- /dev/null +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleKafkaApplication.java @@ -0,0 +1,27 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sample.kafka; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class SampleKafkaApplication { + + public static void main(String[] args) { + SpringApplication.run(SampleKafkaApplication.class, args); + } +} diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleMessage.java b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleMessage.java new file mode 100644 index 00000000000..c0845fb491f --- /dev/null +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleMessage.java @@ -0,0 +1,53 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sample.kafka; + +public class SampleMessage { + private Integer id; + private String message; + + public Integer getId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + public SampleMessage() { + } + + public SampleMessage(Integer id, String message) { + this.id = id; + this.message = message; + } + + @Override + public String toString() { + return "SampleMessage{" + + "id=" + id + + ", message='" + message + '\'' + + '}'; + } +} diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/resources/application.properties b/spring-boot-samples/spring-boot-sample-kafka/src/main/resources/application.properties new file mode 100644 index 00000000000..588dc54c56f --- /dev/null +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/resources/application.properties @@ -0,0 +1,5 @@ +spring.kafka.bootstrap-servers=localhost:9092 +spring.kafka.consumer.group-id=myGroup +spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer +spring.kafka.Producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer +spring.kafka.consumer.properties.spring.json.trusted.packages=sample.kafka \ No newline at end of file diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/test/java/sample/kafka/SampleKafkaApplicationTests.java b/spring-boot-samples/spring-boot-sample-kafka/src/test/java/sample/kafka/SampleKafkaApplicationTests.java new file mode 100644 index 00000000000..3f910199466 --- /dev/null +++ b/spring-boot-samples/spring-boot-sample-kafka/src/test/java/sample/kafka/SampleKafkaApplicationTests.java @@ -0,0 +1,61 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sample.kafka; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.rule.OutputCapture; +import org.springframework.kafka.test.rule.KafkaEmbedded; +import org.springframework.test.context.junit4.SpringRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests for demo application. + * + * @author hcxin + */ +@RunWith(SpringRunner.class) +@SpringBootTest +public class SampleKafkaApplicationTests { + + @Rule + public OutputCapture outputCapture = new OutputCapture(); + + @Autowired + private Producer producer; + + @Test + public void sendSimpleMessage() throws Exception { + initKafkaEmbedded(); + SampleMessage message = new SampleMessage(1, "Test message"); + producer.send(message); + Thread.sleep(1000L); + assertThat(this.outputCapture.toString().contains("Test message")).isTrue(); + } + + public void initKafkaEmbedded() throws Exception { + KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true); + embeddedKafka.setKafkaPorts(9092); + embeddedKafka.afterPropertiesSet(); + //Need 10s, waiting for the Kafka server start. + Thread.sleep(10000L); + + } +} From 07fa8bcf75d62b0fef989a80675fe7e701f70bd2 Mon Sep 17 00:00:00 2001 From: Stephane Nicoll Date: Thu, 8 Feb 2018 12:37:56 +0100 Subject: [PATCH 2/2] Polish "Add Kafka sample" Closes gh-11597 --- spring-boot-samples/README.adoc | 6 +-- spring-boot-samples/pom.xml | 2 +- .../spring-boot-sample-kafka/pom.xml | 7 +-- .../src/main/java/sample/kafka/Consumer.java | 7 +-- .../src/main/java/sample/kafka/Producer.java | 13 +++-- .../sample/kafka/SampleKafkaApplication.java | 9 ++++ .../main/java/sample/kafka/SampleMessage.java | 46 +++++++++--------- .../src/main/resources/application.properties | 5 +- .../kafka/SampleKafkaApplicationTests.java | 48 ++++++++++++------- 9 files changed, 84 insertions(+), 59 deletions(-) diff --git a/spring-boot-samples/README.adoc b/spring-boot-samples/README.adoc index 0e48ede4f86..c2d27ec1ad4 100644 --- a/spring-boot-samples/README.adoc +++ b/spring-boot-samples/README.adoc @@ -119,6 +119,9 @@ The following sample applications are provided: | link:spring-boot-sample-junit-jupiter[spring-boot-sample-junit-jupiter] | Demonstrates JUnit Jupiter-based testing +| link:spring-boot-sample-kafka[spring-boot-sample-kafka] +| consumer and producer using Apache Kafka + | link:spring-boot-sample-liquibase[spring-boot-sample-liquibase] | Database migrations with Liquibase @@ -230,6 +233,3 @@ The following sample applications are provided: | link:spring-boot-sample-xml[spring-boot-sample-xml] | Example show how Spring Boot can be mixed with traditional XML configuration (we generally recommend using Java `@Configuration` whenever possible - -| link:spring-boot-sample-kafka[spring-boot-sample-kafka] -| consumer and producer using Apache Kafka diff --git a/spring-boot-samples/pom.xml b/spring-boot-samples/pom.xml index 83792c9881b..e3a90ee1cc9 100644 --- a/spring-boot-samples/pom.xml +++ b/spring-boot-samples/pom.xml @@ -57,6 +57,7 @@ spring-boot-sample-jta-narayana spring-boot-sample-jta-jndi spring-boot-sample-junit-jupiter + spring-boot-sample-kafka spring-boot-sample-liquibase spring-boot-sample-logback spring-boot-sample-oauth2-client @@ -97,7 +98,6 @@ spring-boot-sample-websocket-undertow spring-boot-sample-webservices spring-boot-sample-xml - spring-boot-sample-kafka diff --git a/spring-boot-samples/spring-boot-sample-kafka/pom.xml b/spring-boot-samples/spring-boot-sample-kafka/pom.xml index 901631faa6e..e0d3ca27171 100644 --- a/spring-boot-samples/spring-boot-sample-kafka/pom.xml +++ b/spring-boot-samples/spring-boot-sample-kafka/pom.xml @@ -20,16 +20,12 @@ org.springframework.boot - spring-boot-starter + spring-boot-starter-json org.springframework.kafka spring-kafka - - org.springframework.boot - spring-boot-starter-json - org.springframework.boot @@ -39,6 +35,7 @@ org.springframework.kafka spring-kafka-test + test diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Consumer.java b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Consumer.java index 4e7e2a8b3d3..a18227b40a4 100644 --- a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Consumer.java +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Consumer.java @@ -19,10 +19,11 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component -public class Consumer { +class Consumer { - @KafkaListener(topics = "myTopic") + @KafkaListener(topics = "testTopic") public void processMessage(SampleMessage message) { - System.out.println("consumer has received message : [" + message + "]"); + System.out.println("Received sample message [" + message + "]"); } + } \ No newline at end of file diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Producer.java b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Producer.java index 89c50e3533a..3297bb4780c 100644 --- a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Producer.java +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/Producer.java @@ -15,18 +15,21 @@ */ package sample.kafka; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component public class Producer { - @Autowired - private KafkaTemplate kafkaTemplate; + private final KafkaTemplate kafkaTemplate; + + Producer(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } public void send(SampleMessage message) { - kafkaTemplate.send("myTopic", message); - System.out.println("producer has sent message."); + this.kafkaTemplate.send("testTopic", message); + System.out.println("Sent sample message [" + message + "]"); } + } \ No newline at end of file diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleKafkaApplication.java b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleKafkaApplication.java index 9769c06acea..00e677b1f6b 100644 --- a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleKafkaApplication.java +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleKafkaApplication.java @@ -15,8 +15,11 @@ */ package sample.kafka; +import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.annotation.Bean; @SpringBootApplication public class SampleKafkaApplication { @@ -24,4 +27,10 @@ public class SampleKafkaApplication { public static void main(String[] args) { SpringApplication.run(SampleKafkaApplication.class, args); } + + @Bean + public ApplicationRunner runner(Producer producer) { + return args -> producer.send(new SampleMessage(1, "A simple test message")); + } + } diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleMessage.java b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleMessage.java index c0845fb491f..d11cd9a4f91 100644 --- a/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleMessage.java +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/java/sample/kafka/SampleMessage.java @@ -15,39 +15,37 @@ */ package sample.kafka; -public class SampleMessage { - private Integer id; - private String message; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; - public Integer getId() { - return id; +public class SampleMessage { + + private final Integer id; + + private final String message; + + @JsonCreator + public SampleMessage(@JsonProperty("id") Integer id, + @JsonProperty("message") String message) { + this.id = id; + this.message = message; } - public void setId(Integer id) { - this.id = id; + public Integer getId() { + return this.id; } public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } - - public SampleMessage() { - } - - public SampleMessage(Integer id, String message) { - this.id = id; - this.message = message; + return this.message; } @Override public String toString() { - return "SampleMessage{" + - "id=" + id + - ", message='" + message + '\'' + - '}'; + final StringBuilder sb = new StringBuilder("SampleMessage{"); + sb.append("id=").append(this.id); + sb.append(", message='").append(this.message).append('\''); + sb.append('}'); + return sb.toString(); } + } diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/main/resources/application.properties b/spring-boot-samples/spring-boot-sample-kafka/src/main/resources/application.properties index 588dc54c56f..97dbed42b9b 100644 --- a/spring-boot-samples/spring-boot-sample-kafka/src/main/resources/application.properties +++ b/spring-boot-samples/spring-boot-sample-kafka/src/main/resources/application.properties @@ -1,5 +1,6 @@ spring.kafka.bootstrap-servers=localhost:9092 -spring.kafka.consumer.group-id=myGroup +spring.kafka.consumer.group-id=testGroup +spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer -spring.kafka.Producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer +spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer spring.kafka.consumer.properties.spring.json.trusted.packages=sample.kafka \ No newline at end of file diff --git a/spring-boot-samples/spring-boot-sample-kafka/src/test/java/sample/kafka/SampleKafkaApplicationTests.java b/spring-boot-samples/spring-boot-sample-kafka/src/test/java/sample/kafka/SampleKafkaApplicationTests.java index 3f910199466..c6364d6d7d5 100644 --- a/spring-boot-samples/spring-boot-sample-kafka/src/test/java/sample/kafka/SampleKafkaApplicationTests.java +++ b/spring-boot-samples/spring-boot-sample-kafka/src/test/java/sample/kafka/SampleKafkaApplicationTests.java @@ -15,13 +15,20 @@ */ package sample.kafka; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.rule.OutputCapture; -import org.springframework.kafka.test.rule.KafkaEmbedded; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit4.SpringRunner; import static org.assertj.core.api.Assertions.assertThat; @@ -30,32 +37,41 @@ import static org.assertj.core.api.Assertions.assertThat; * Integration tests for demo application. * * @author hcxin + * @author Gary Russell + * @author Stephane Nicoll */ @RunWith(SpringRunner.class) @SpringBootTest +@TestPropertySource(properties = "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}") +@EmbeddedKafka public class SampleKafkaApplicationTests { + private static final CountDownLatch latch = new CountDownLatch(1); + @Rule public OutputCapture outputCapture = new OutputCapture(); - @Autowired - private Producer producer; - @Test - public void sendSimpleMessage() throws Exception { - initKafkaEmbedded(); - SampleMessage message = new SampleMessage(1, "Test message"); - producer.send(message); - Thread.sleep(1000L); - assertThat(this.outputCapture.toString().contains("Test message")).isTrue(); + public void testVanillaExchange() throws Exception { + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(this.outputCapture.toString().contains("A simple test message")) + .isTrue(); } - public void initKafkaEmbedded() throws Exception { - KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true); - embeddedKafka.setKafkaPorts(9092); - embeddedKafka.afterPropertiesSet(); - //Need 10s, waiting for the Kafka server start. - Thread.sleep(10000L); + @TestConfiguration + public static class Config { + + @Bean + public Consumer consumer() { + return new Consumer() { + @Override + public void processMessage(SampleMessage message) { + super.processMessage(message); + latch.countDown(); + } + }; + } } + }