add new method to get offset from kafka

This commit is contained in:
Carol 2019-12-05 10:32:15 +08:00
parent 001cf879f3
commit 3ab73c5000
2 changed files with 192 additions and 67 deletions

View File

@ -3,6 +3,7 @@
<component name="ChangeListManager">
<list default="true" id="52b7c556-aac6-402c-b840-5f2564f1f5d7" name="Default" comment="">
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/site/cnkj/utils/KafkaUtil.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/site/cnkj/utils/KafkaUtil.java" afterDir="false" />
</list>
<ignored path="$PROJECT_DIR$/target/" />
<option name="EXCLUDED_CONVERTED_TO_IGNORED" value="true" />
@ -17,7 +18,7 @@
<file leaf-file-name="MongodbConfig.java" pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/src/main/java/site/cnkj/utils/config/MongodbConfig.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="760">
<state relative-caret-position="620">
<caret line="38" column="29" selection-start-line="38" selection-start-column="18" selection-end-line="38" selection-end-column="29" />
<folding>
<element signature="imports" expanded="true" />
@ -26,10 +27,10 @@
</provider>
</entry>
</file>
<file leaf-file-name="MongodbUtil.java" pinned="false" current-in-tab="true">
<file leaf-file-name="MongodbUtil.java" pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/src/main/java/site/cnkj/utils/MongodbUtil.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="300">
<state relative-caret-position="2200">
<caret line="120" column="49" selection-start-line="120" selection-start-column="49" selection-end-line="120" selection-end-column="49" />
<folding>
<element signature="imports" expanded="true" />
@ -38,6 +39,50 @@
</provider>
</entry>
</file>
<file leaf-file-name="KafkaUtil.java" pinned="false" current-in-tab="true">
<entry file="file://$PROJECT_DIR$/src/main/java/site/cnkj/utils/KafkaUtil.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="160">
<caret line="12" lean-forward="true" selection-start-line="12" selection-end-line="105" selection-end-column="5" />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="KafkaConsumer.java" pinned="false" current-in-tab="false">
<entry file="jar://$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/1.1.1/kafka-clients-1.1.1-sources.jar!/org/apache/kafka/clients/consumer/KafkaConsumer.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="-2941">
<caret line="652" column="40" selection-start-line="652" selection-start-column="37" selection-end-line="652" selection-end-column="40" />
<folding>
<element signature="e#37529#37530#0" expanded="true" />
<element signature="e#37570#37571#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
</file>
<file leaf-file-name="ConsumerConfig.java" pinned="false" current-in-tab="false">
<entry file="jar://$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/1.1.1/kafka-clients-1.1.1-sources.jar!/org/apache/kafka/clients/consumer/ConsumerConfig.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="299">
<caret line="258" column="47" selection-start-line="258" selection-start-column="47" selection-end-line="258" selection-end-column="47" />
<folding>
<element signature="e#29655#29656#0" expanded="true" />
<element signature="e#29691#29692#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
</file>
<file leaf-file-name="CommonClientConfigs.java" pinned="false" current-in-tab="false">
<entry file="jar://$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/1.1.1/kafka-clients-1.1.1-sources.jar!/org/apache/kafka/clients/CommonClientConfigs.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="597">
<caret line="52" column="60" selection-start-line="52" selection-start-column="51" selection-end-line="52" selection-end-column="60" />
</state>
</provider>
</entry>
</file>
<file leaf-file-name="README.md" pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/README.md">
<provider selected="true" editor-type-id="split-provider[text-editor;markdown-preview-editor]">
@ -52,12 +97,12 @@
</file>
<file leaf-file-name="pom.xml" pinned="false" current-in-tab="false">
<entry file="file://$PROJECT_DIR$/pom.xml">
<provider editor-type-id="MavenHelperPluginDependencyAnalyzer" />
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="920">
<caret line="46" selection-start-line="45" selection-start-column="21" selection-end-line="46" />
</state>
</provider>
<provider editor-type-id="MavenHelperPluginDependencyAnalyzer" />
</entry>
</file>
</leaf>
@ -77,6 +122,9 @@
<find>listRemove</find>
<find>public long rightPush</find>
<find>MongoClient</find>
<find>group</find>
<find>get</find>
<find>BOOTSTRAP_SERVERS_CONFIG</find>
</findStrings>
</component>
<component name="Git.Settings">
@ -97,6 +145,7 @@
<option value="$PROJECT_DIR$/README.md" />
<option value="$PROJECT_DIR$/pom.xml" />
<option value="$PROJECT_DIR$/src/main/java/site/cnkj/utils/MongodbUtil.java" />
<option value="$PROJECT_DIR$/src/main/java/site/cnkj/utils/KafkaUtil.java" />
</list>
</option>
</component>
@ -153,9 +202,6 @@
<foldersAlwaysOnTop value="true" />
</navigator>
<panes>
<pane id="AndroidView" />
<pane id="PackagesPane" />
<pane id="Scope" />
<pane id="ProjectPane">
<subPane>
<expand>
@ -211,6 +257,9 @@
<select />
</subPane>
</pane>
<pane id="Scope" />
<pane id="AndroidView" />
<pane id="PackagesPane" />
</panes>
</component>
<component name="PropertiesComponent">
@ -252,7 +301,7 @@
</list>
</option>
</component>
<component name="RunManager">
<component name="RunManager" selected="Application.KafkaUtil">
<configuration name="HttpCommonUtil" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="site.cnkj.utils.HttpCommonUtil" />
<module name="CommonUtil" />
@ -264,6 +313,17 @@
</pattern>
</extension>
</configuration>
<configuration name="KafkaUtil" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
<option name="MAIN_CLASS_NAME" value="site.cnkj.utils.KafkaUtil" />
<module name="CommonUtil" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="site.cnkj.utils.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
</configuration>
<configuration default="true" type="Application" factoryName="Application">
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
</configuration>
@ -305,8 +365,13 @@
<properties />
<listeners />
</configuration>
<list>
<item itemvalue="Application.HttpCommonUtil" />
<item itemvalue="Application.KafkaUtil" />
</list>
<recent_temporary>
<list>
<item itemvalue="Application.KafkaUtil" />
<item itemvalue="Application.HttpCommonUtil" />
</list>
</recent_temporary>
@ -324,6 +389,8 @@
<workItem from="1574318985781" duration="20983000" />
<workItem from="1574645677739" duration="20147000" />
<workItem from="1574752879588" duration="2458000" />
<workItem from="1574835832018" duration="761000" />
<workItem from="1575510784503" duration="2111000" />
</task>
<task id="LOCAL-00001" summary="repair some lombok questions">
<created>1574320188766</created>
@ -416,14 +483,22 @@
<option name="project" value="LOCAL" />
<updated>1574819212177</updated>
</task>
<option name="localTasksCounter" value="14" />
<task id="LOCAL-00014" summary="change result for KafkaUtil from &quot;TopicPartition&quot; to &quot;String&quot;">
<created>1574835930667</created>
<option name="number" value="00014" />
<option name="presentableId" value="LOCAL-00014" />
<option name="project" value="LOCAL" />
<updated>1574835930667</updated>
</task>
<option name="localTasksCounter" value="15" />
<servers />
</component>
<component name="TimeTrackingManager">
<option name="totallyTimeSpent" value="43588000" />
<option name="totallyTimeSpent" value="46460000" />
</component>
<component name="ToolWindowManager">
<frame x="-8" y="-8" width="1936" height="1056" extended-state="6" />
<editor active="true" />
<layout>
<window_info anchor="right" id="Palette" order="3" />
<window_info anchor="bottom" id="Event Log" order="7" side_tool="true" />
@ -438,21 +513,21 @@
<window_info id="UI Designer" order="2" />
<window_info anchor="bottom" id="Debug" order="3" weight="0.4" />
<window_info anchor="bottom" id="TODO" order="6" />
<window_info anchor="bottom" id="Messages" order="7" weight="0.3293348" />
<window_info anchor="right" id="Palette&#9;" order="3" />
<window_info id="Image Layers" order="2" />
<window_info anchor="bottom" id="Java Enterprise" order="7" />
<window_info anchor="right" id="Capture Analysis" order="3" />
<window_info anchor="bottom" id="Version Control" order="7" visible="true" weight="0.3293348" />
<window_info anchor="bottom" id="Run" order="2" weight="0.519084" />
<window_info anchor="bottom" id="Version Control" order="7" weight="0.32897604" />
<window_info anchor="bottom" id="Run" order="2" weight="0.44056708" />
<window_info anchor="bottom" id="Spring" order="7" />
<window_info anchor="bottom" id="Terminal" order="7" weight="0.3293348" />
<window_info active="true" content_ui="combo" id="Project" order="0" visible="true" weight="0.20628998" />
<window_info content_ui="combo" id="Project" order="0" weight="0.20628998" />
<window_info anchor="right" id="Bean Validation" order="3" />
<window_info id="Web" order="2" side_tool="true" />
<window_info anchor="bottom" id="SonarLint" order="7" weight="0.3293348" />
<window_info anchor="right" id="Theme Preview" order="3" />
<window_info id="Favorites" order="2" side_tool="true" />
<window_info anchor="bottom" id="Messages" order="7" weight="0.3293348" />
<window_info anchor="bottom" id="Inspection" order="5" weight="0.4" />
<window_info anchor="right" id="Commander" internal_type="SLIDING" order="0" type="SLIDING" weight="0.4" />
<window_info anchor="bottom" id="Cvs" order="4" weight="0.25" />
@ -541,59 +616,11 @@
<MESSAGE value="change README" />
<MESSAGE value="new MongoDBUtil" />
<MESSAGE value="1.0.2" />
<option name="LAST_COMMIT_MESSAGE" value="1.0.2" />
<MESSAGE value="change result for KafkaUtil from &quot;TopicPartition&quot; to &quot;String&quot;" />
<option name="LAST_COMMIT_MESSAGE" value="change result for KafkaUtil from &quot;TopicPartition&quot; to &quot;String&quot;" />
<option name="OPTIMIZE_IMPORTS_BEFORE_PROJECT_COMMIT" value="true" />
</component>
<component name="editorHistoryManager">
<entry file="file://$PROJECT_DIR$/src/main/java/site/cnkj/utils/HttpCommonUtil.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="448">
<caret line="82" column="18" lean-forward="true" selection-start-line="82" selection-start-column="18" selection-end-line="82" selection-end-column="18" />
</state>
</provider>
</entry>
<entry file="jar://$APPLICATION_HOME_DIR$/plugins/maven/lib/maven.jar!/schemas/maven-4.0.0.xsd">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="-652">
<caret line="1209" column="12" selection-start-line="1209" selection-start-column="12" selection-end-line="1209" selection-end-column="12" />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/src/main/java/site/cnkj/utils/DateUtil.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="188">
<caret line="443" column="17" lean-forward="true" selection-start-line="443" selection-start-column="17" selection-end-line="443" selection-end-column="17" />
</state>
</provider>
</entry>
<entry file="jar://$MAVEN_REPOSITORY$/org/springframework/data/spring-data-redis/2.1.5.RELEASE/spring-data-redis-2.1.5.RELEASE-sources.jar!/org/springframework/data/redis/core/SetOperations.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="297">
<caret line="52" column="9" selection-start-line="52" selection-start-column="9" selection-end-line="52" selection-end-column="9" />
</state>
</provider>
</entry>
<entry file="jar://$MAVEN_REPOSITORY$/org/springframework/data/spring-data-redis/2.1.5.RELEASE/spring-data-redis-2.1.5.RELEASE-sources.jar!/org/springframework/data/redis/connection/jedis/JedisClusterStringCommands.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="437">
<caret line="222" column="81" selection-start-line="222" selection-start-column="81" selection-end-line="222" selection-end-column="81" />
</state>
</provider>
</entry>
<entry file="jar://$MAVEN_REPOSITORY$/org/springframework/data/spring-data-redis/2.1.5.RELEASE/spring-data-redis-2.1.5.RELEASE-sources.jar!/org/springframework/data/redis/connection/jedis/JedisStringCommands.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="297">
<caret line="304" column="19" selection-start-line="304" selection-start-column="19" selection-end-line="304" selection-end-column="19" />
</state>
</provider>
</entry>
<entry file="jar://$MAVEN_REPOSITORY$/org/springframework/data/spring-data-redis/2.1.5.RELEASE/spring-data-redis-2.1.5.RELEASE-sources.jar!/org/springframework/data/redis/connection/DefaultedRedisConnection.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="309">
<caret line="301" column="35" lean-forward="true" selection-start-line="301" selection-start-column="35" selection-end-line="301" selection-end-column="35" />
</state>
</provider>
</entry>
<entry file="jar://$MAVEN_REPOSITORY$/org/springframework/data/spring-data-redis/2.1.5.RELEASE/spring-data-redis-2.1.5.RELEASE-sources.jar!/org/springframework/data/redis/connection/DefaultStringRedisConnection.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="289">
@ -869,7 +896,7 @@
</entry>
<entry file="file://$PROJECT_DIR$/src/main/java/site/cnkj/utils/config/MongodbConfig.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="760">
<state relative-caret-position="620">
<caret line="38" column="29" selection-start-line="38" selection-start-column="18" selection-end-line="38" selection-end-column="29" />
<folding>
<element signature="imports" expanded="true" />
@ -895,16 +922,16 @@
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/pom.xml">
<provider editor-type-id="MavenHelperPluginDependencyAnalyzer" />
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="920">
<caret line="46" selection-start-line="45" selection-start-column="21" selection-end-line="46" />
</state>
</provider>
<provider editor-type-id="MavenHelperPluginDependencyAnalyzer" />
</entry>
<entry file="file://$PROJECT_DIR$/src/main/java/site/cnkj/utils/MongodbUtil.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="300">
<state relative-caret-position="2200">
<caret line="120" column="49" selection-start-line="120" selection-start-column="49" selection-end-line="120" selection-end-column="49" />
<folding>
<element signature="imports" expanded="true" />
@ -912,6 +939,71 @@
</state>
</provider>
</entry>
<entry file="jar://C:/Program Files/Java/jdk1.8.0_171/src.zip!/java/util/Map.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="245">
<caret line="128" column="17" selection-start-line="128" selection-start-column="17" selection-end-line="128" selection-end-column="17" />
</state>
</provider>
</entry>
<entry file="jar://$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/1.1.1/kafka-clients-1.1.1-sources.jar!/org/apache/kafka/common/config/AbstractConfig.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="289">
<caret line="84" column="34" selection-start-line="84" selection-start-column="34" selection-end-line="84" selection-end-column="34" />
<folding>
<element signature="e#2932#2933#0" expanded="true" />
<element signature="e#2981#2982#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
<entry file="jar://$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/1.1.1/kafka-clients-1.1.1-sources.jar!/org/apache/kafka/clients/consumer/KafkaConsumer.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="-2941">
<caret line="652" column="40" selection-start-line="652" selection-start-column="37" selection-end-line="652" selection-end-column="40" />
<folding>
<element signature="e#37529#37530#0" expanded="true" />
<element signature="e#37570#37571#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
<entry file="jar://C:/Program Files/Java/jdk1.8.0_171/src.zip!/java/util/Collections.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="297">
<caret line="4787" column="30" selection-start-line="4787" selection-start-column="30" selection-end-line="4787" selection-end-column="30" />
<folding>
<element signature="e#186747#186748#0" expanded="true" />
<element signature="e#186792#186793#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
<entry file="jar://$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/1.1.1/kafka-clients-1.1.1-sources.jar!/org/apache/kafka/clients/consumer/ConsumerConfig.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="299">
<caret line="258" column="47" selection-start-line="258" selection-start-column="47" selection-end-line="258" selection-end-column="47" />
<folding>
<element signature="e#29655#29656#0" expanded="true" />
<element signature="e#29691#29692#0" expanded="true" />
</folding>
</state>
</provider>
</entry>
<entry file="jar://$MAVEN_REPOSITORY$/org/apache/kafka/kafka-clients/1.1.1/kafka-clients-1.1.1-sources.jar!/org/apache/kafka/clients/CommonClientConfigs.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="597">
<caret line="52" column="60" selection-start-line="52" selection-start-column="51" selection-end-line="52" selection-end-column="60" />
</state>
</provider>
</entry>
<entry file="file://$PROJECT_DIR$/src/main/java/site/cnkj/utils/KafkaUtil.java">
<provider selected="true" editor-type-id="text-editor">
<state relative-caret-position="160">
<caret line="12" lean-forward="true" selection-start-line="12" selection-end-line="105" selection-end-column="5" />
</state>
</provider>
</entry>
</component>
<component name="masterDetails">
<states>

View File

@ -11,6 +11,12 @@ import java.util.*;
*/
public class KafkaUtil {
private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
private static final String GROUP_ID = "group.id";
private static final String KEY_DESERIALIZER = "key.deserializer";
private static final String VALUE_DESERIALIZER = "value.deserializer";
private static final String STRING_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
/**
* 获取当前topic下的全部分区的偏移量信息
@ -72,4 +78,31 @@ public class KafkaUtil {
}
}
/**
* 获取指定集群上指定groupId的topic的每个分区的当前偏移量
*
* @param bootstrapServers 消费地址
* @param groupId id
* @param topics topic集合
* @return {
* topic:
* {
* partitionInfo:offset
* }
* }
*/
public static Map<String, Map<String, Long>> getConsumerOffset(List<String> bootstrapServers, String groupId, List<String> topics){
try {
Map<String, Object> properties = new HashMap<>();
properties.put(BOOTSTRAP_SERVERS, bootstrapServers);
properties.put(GROUP_ID, groupId);
properties.put(KEY_DESERIALIZER, STRING_DESERIALIZER);
properties.put(VALUE_DESERIALIZER, STRING_DESERIALIZER);
return getConsumerTopicPartitionsOffset(properties, new HashSet<String>(topics));
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}