一、概述
本文以 kafka_2.5.1
版本为例,描述的常用命令,在 3.x
版本上命令有所不同。
二、主要命令
kafka启动命令:nohup bin/kafka-server-start.sh config/server.properties &
kafka停止命令:bin/kafka-server-stop.sh
2.1 Topic 相关
- 创建名为
test
的Topic
:bin/kafka-topics.sh -zookeeper localhost:2181 --create --partitions 5 --replication-factor 1 --topic test
- –partitions:分区数
- –replication-factor:副本数
- 查询
Topic
列表:bin/kafka-topics.sh --list --zookeeper localhost:2181
- 删除名为
test
的Topic:bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic tes
t- 配置
delete.topic.enable
为true
,这样才能删除topic
- 配置
- 查询
Topic
的信息:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
- 如果未指定
topic
则输出所有topic
的信息
- 如果未指定
2.2 消息相关
- 生产者发送消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
- 消费者查询消息:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group t1
- –from-beginning:表示从头开始接收数据–group:指定消费者组
- 查询名为
test
的Topic
的消息:bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -2
- –time-1 表示要获取指定
topic
所有分区当前的最大位移(历史总消息数),–time-2 表示获取当前最早位移(被消费的消息数),两个命令的输出结果相减便可得到所有分区当前的消息总数。 - 输出示例:test:0:3
- 第一个数字0表示分区,第二个数字3表示偏移量。
- –time-1 表示要获取指定