在本文中,我们介绍下在kafka中常见的操作命令,下面直接开始:
查看当前服务器中的所有topic
kafka-topics --zookeeper xxxxxx:2181 --list --exclude-internal 说明: exclude-internal:排除kafka内部topic 比如: --exclude-internal --topic "test_.*"
创建topic
kafka-topics --zookeeper xxxxxx:2181 --create --replication-factor --partitions 1 --topic topic_name 说明: --topic 定义topic名 --replication-factor 定义副本数 --partitions 定义分区数
删除topic
kafka-topics --zookeeper xxxxxx:2181 --delete --topic topic_name 备注:需要server.properties中设置delete.topic.enable=true否则只是标记删除
生产者
kafka-console-producer --broker-list xxxxxx:9092 --topic topic_name 可加:--property parse.key=true(有key消息)
消费者
kafka-console-consumer --bootstrap-server xxxxxx:9092 --topic topic_name 注:可选 --from-beginning:会把主题中以往所有的数据都读取出来 --whitelist '.*' :消费所有的topic --property print.key=true:显示key进行消费 --partition 0:指定分区消费 --offset:指定起始偏移量消费
查看某个topic的详情
kafka-topics --zookeeper xxxxxx:2181 --describe --topic topic_name
修改分区数
kafka-topics --zookeeper xxxxxx:2181 --alter --topic topic_name --partitions 6
查看某个消费者组信息
kafka-consumer-groups --bootstrap-server xxxxxx:9092 --describe --group group_name
删除消费者组
kafka-consumer-groups --bootstrap-server xxxxxx:9092 ---delete --group group_name
重置offset
kafka-consumer-groups --bootstrap-server xxxxxx:9092 --group group_name --reset-offsets --all-topics --to-latest --execute
leader重新选举
kafka-leader-election --bootstrap-server xxxxxx:9092 --topic topic_name --election-type PREFERRED --partition 0 指定Topic指定分区用重新PREFERRED:优先副本策略 进行Leader重选举 kafka-leader-election --bootstrap-server xxxxxx:9092 --election-type preferred --all-topic-partitions 所有Topic所有分区用重新PREFERRED:优先副本策略 进行Leader重选举
查询kafka版本信息
kafka-configs --bootstrap-server xxxxxx:9092 --describe --version
topic添加/修改动态配置
kafka-configs --bootstrap-server xxxxxx:9092 --alter --entity-type topics --entity-name topic_name --add-config file.delete.delay.ms=222222,retention.ms=999999
topic删除动态配置
kafka-configs --bootstrap-server xxxxxx:9092 --alter --entity-type topics --entity-name topic_name --delete-config file.delete.delay.ms,retention.ms
备注:
1、这里对于topic增删改的配置的说明如下:
功能说明 | 参数 |
---|---|
选择类型 | --entity-type (topics/clients/users/brokers/broker- loggers) |
类型名称 | --entity-name |
删除配置 | --delete-config k1=v1,k2=v2 |
添加/修改配置 | --add-config k1,k2 |
单次最大消费10条消息(不加参数意为持续消费)
kafka-verifiable-consumer --bootstrap-server xxxxxx:9092 --group group_name --topic topic_name --max-messages 10
删除指定topic的某个分区的消息删除至offset为1024
首先需要准备一个json文件
{ "partitions": [ { "topic": "topic_name", "partition": 0, "offset": 1024 } ], "version": 1 }
然后再进行删除
kafka-delete-records --bootstrap-server xxxxxx:9092 --offset-json-file offset-json-file.json
查询指定topic的磁盘信息
kafka-log-dirs --bootstrap-server xxxxxx:9090 --describe --topic-list topic1,topic2
查询指定broker磁盘信息
kafka-log-dirs --bootstrap-server xxxxxx:9090 --describe --topic-list topic1 --broker-list 0
还没有评论,来说两句吧...