From 001cf879f395aff24f73b91e7874c68e794081d7 Mon Sep 17 00:00:00 2001
From: Carol <947752894@qq.com>
Date: Wed, 27 Nov 2019 14:25:29 +0800
Subject: [PATCH] change result for KafkaUtil from "TopicPartition" to "String"
---
.idea/workspace.xml | 14 ++++++++++----
src/main/java/site/cnkj/utils/KafkaUtil.java | 14 +++++++++-----
2 files changed, 19 insertions(+), 9 deletions(-)
diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index beead86..4792b69 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -3,7 +3,6 @@
-
@@ -324,7 +323,7 @@
1574318981558
-
+
1574320188766
@@ -410,11 +409,18 @@
1574672730578
-
+
+ 1574819212177
+
+
+
+ 1574819212177
+
+
-
+
diff --git a/src/main/java/site/cnkj/utils/KafkaUtil.java b/src/main/java/site/cnkj/utils/KafkaUtil.java
index abbec03..a20d9ab 100644
--- a/src/main/java/site/cnkj/utils/KafkaUtil.java
+++ b/src/main/java/site/cnkj/utils/KafkaUtil.java
@@ -19,11 +19,15 @@ public class KafkaUtil {
* @param partitions Collection partitions
* @return {partition:offset}
*/
- public static Map getConsumerPartitionsOffset(Map properties, Collection partitions) {
+ public static Map getConsumerPartitionsOffset(Map properties, Collection partitions) {
KafkaConsumer consumer = new KafkaConsumer(properties);
+ Map partitionsOffset = new HashMap<>();
try {
Map endOffsets = consumer.endOffsets(partitions);
- return endOffsets;
+ for (TopicPartition topicPartition : endOffsets.keySet()) {
+ partitionsOffset.put(topicPartition.toString(), endOffsets.get(topicPartition));
+ }
+ return partitionsOffset;
} catch (Exception e) {
e.printStackTrace();
return null;
@@ -45,8 +49,8 @@ public class KafkaUtil {
* }
* }
*/
- public static Map> getConsumerTopicPartitionsOffset(Map properties, Set topics){
- Map> topicPartitionMap = new HashMap<>();
+ public static Map> getConsumerTopicPartitionsOffset(Map properties, Set topics){
+ Map> topicPartitionMap = new HashMap<>();
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
try {
for (String topic : topics) {
@@ -56,7 +60,7 @@ public class KafkaUtil {
TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
topicPartitions.add(topicPartition);
}
- Map topicPartitionsOffset = getConsumerPartitionsOffset(properties, topicPartitions);
+ Map topicPartitionsOffset = getConsumerPartitionsOffset(properties, topicPartitions);
topicPartitionMap.put(topic, topicPartitionsOffset);
}
return topicPartitionMap;