java 读取kafka指定offset的内容
代码如下,主要是没找到使用kafka自带的命令改怎么查询
查看当前的topic的offset值
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xxx:xxx --topic
xxx --time -1
java代码如下:
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.
kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer
.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition; import java.util.*; public class
GetKafkaByOffset { public static void main(String[] args) { //从哪个offset开始 String
offsetn= args[1]; //topic的名字 String topicName = args[0]; //读取多少条 String countn=
args[2]; Properties props = new Properties(); // Kafka服务端的主机名和端口号 props.put(
"bootstrap.servers", "xxx:xxx"); // 等待所有副本节点的应答 props.put("acks", "all"); //
消息发送最大尝试次数 props.put("retries", 0); // 一批消息处理大小 props.put("batch.size", 16384);
// 请求延时 props.put("linger.ms", 1); // 发送缓存区内存大小 props.put("buffer.memory",
33554432); //调用返回的记录数 props.put("max.poll.records", Integer.parseInt(countn));
//可以取值为latest(从最新的消息开始消费)或者earliest(从最老的消息开始消费) props.put("auto.offset.reset",
"earliest"); //设置消费者组名称 props.put("group.id", "xxx"); props.put(
"key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<
String, String> consumer = new KafkaConsumer<String, String>(props); Map<
TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition,
OffsetAndMetadata>(); hashMaps.put(new TopicPartition(topicName, 0), new
OffsetAndMetadata(Long.parseLong(offsetn))); consumer.commitSync(hashMaps);
consumer.subscribe(Arrays.asList(topicName)); ConsumerRecords<String, String>
records= consumer.poll(1000); for (ConsumerRecord<String, String> record :
records) { System.out.println(record.toString()); } } }

技术
下载桌面版
GitHub
百度网盘(提取码:draw)
Gitee
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:766591547
关注微信