change result for KafkaUtil from "TopicPartition" to "String"

This commit is contained in:
Carol 2019-11-27 14:25:29 +08:00
parent 30f0f0eb80
commit 001cf879f3
2 changed files with 19 additions and 9 deletions

View File

@ -3,7 +3,6 @@
<component name="ChangeListManager">
<list default="true" id="52b7c556-aac6-402c-b840-5f2564f1f5d7" name="Default" comment="">
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/site/cnkj/utils/MongodbUtil.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/site/cnkj/utils/MongodbUtil.java" afterDir="false" />
</list>
<ignored path="$PROJECT_DIR$/target/" />
<option name="EXCLUDED_CONVERTED_TO_IGNORED" value="true" />
@ -324,7 +323,7 @@
<updated>1574318981558</updated>
<workItem from="1574318985781" duration="20983000" />
<workItem from="1574645677739" duration="20147000" />
<workItem from="1574752879588" duration="1821000" />
<workItem from="1574752879588" duration="2458000" />
</task>
<task id="LOCAL-00001" summary="repair some lombok questions">
<created>1574320188766</created>
@ -410,11 +409,18 @@
<option name="project" value="LOCAL" />
<updated>1574672730578</updated>
</task>
<option name="localTasksCounter" value="13" />
<task id="LOCAL-00013" summary="1.0.2">
<created>1574819212177</created>
<option name="number" value="00013" />
<option name="presentableId" value="LOCAL-00013" />
<option name="project" value="LOCAL" />
<updated>1574819212177</updated>
</task>
<option name="localTasksCounter" value="14" />
<servers />
</component>
<component name="TimeTrackingManager">
<option name="totallyTimeSpent" value="42951000" />
<option name="totallyTimeSpent" value="43588000" />
</component>
<component name="ToolWindowManager">
<frame x="-8" y="-8" width="1936" height="1056" extended-state="6" />

View File

@ -19,11 +19,15 @@ public class KafkaUtil {
* @param partitions Collection<TopicPartition> partitions
* @return {partition:offset}
*/
public static Map<TopicPartition, Long> getConsumerPartitionsOffset(Map<String, Object> properties, Collection<TopicPartition> partitions) {
public static Map<String, Long> getConsumerPartitionsOffset(Map<String, Object> properties, Collection<TopicPartition> partitions) {
KafkaConsumer consumer = new KafkaConsumer(properties);
Map<String, Long> partitionsOffset = new HashMap<>();
try {
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(partitions);
return endOffsets;
for (TopicPartition topicPartition : endOffsets.keySet()) {
partitionsOffset.put(topicPartition.toString(), endOffsets.get(topicPartition));
}
return partitionsOffset;
} catch (Exception e) {
e.printStackTrace();
return null;
@ -45,8 +49,8 @@ public class KafkaUtil {
* }
* }
*/
public static Map<String, Map<TopicPartition, Long>> getConsumerTopicPartitionsOffset(Map<String, Object> properties, Set<String> topics){
Map<String, Map<TopicPartition, Long>> topicPartitionMap = new HashMap<>();
public static Map<String, Map<String, Long>> getConsumerTopicPartitionsOffset(Map<String, Object> properties, Set<String> topics){
Map<String, Map<String, Long>> topicPartitionMap = new HashMap<>();
KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
try {
for (String topic : topics) {
@ -56,7 +60,7 @@ public class KafkaUtil {
TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
topicPartitions.add(topicPartition);
}
Map<TopicPartition, Long> topicPartitionsOffset = getConsumerPartitionsOffset(properties, topicPartitions);
Map<String, Long> topicPartitionsOffset = getConsumerPartitionsOffset(properties, topicPartitions);
topicPartitionMap.put(topic, topicPartitionsOffset);
}
return topicPartitionMap;