-
启动kafka.
a. 要启动kafka,必须先启动zookeeper:
# 启动zookeeperzkServer.sh start# 查看zookeeper启动状态zkServer.sh status复制代码
b. 启动kafka:
./bin/kafka-server-start.sh config/server.properties复制代码
-
在kafka中创建topic:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test复制代码
创建一个分区,且分区里面分配了一个副本,的名叫 test的topic。
-
查看kafka中拥有的所有的topic主题:
./bin/kafka-topics.sh --list --zookeeper localhost:2181复制代码
localhost表示当前的机子节点创建topic,也可以是kafka集群中其它节点的ip地址
-
查看topic主题的详细信息:
./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test复制代码
如下是topic的详情信息:
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0复制代码
-
删除某个topic:
./bin/kafka-topics.sh --zookeeper 192.168.241.20:2181 --delete --topic test复制代码
如果kafka的配置文件config/server.properties中,delete.topic.enable=false,这个配置为false,那么删除topic 的命令,就只是通过zookeeper对topic进行标记为marked for deletion而已没有真正的删除,如果是true则是真正的删除。
-
创建一个Producer来对kafka进行消息发送:
./bin/kafka-console-producer.sh --broker-list master:9092 --topic test复制代码
在kafka集群列表的master:9092中为kafka集群消息队列创建一个控制台类型的消息生产者producer,消息的主题 类型是test
-
创建一个consumer消息消费者来消费kafka队列中的消息:
./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test --from-beginning复制代码
在zookeeper的master节点中创建一个kafka队列的控制台消息消费者,消费的topic主题类型是test,从topic的 partition头开始消费,即offset为0
./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test --consumer-property group.id=group_test --from-beginning复制代码
在zookeeper的master节点中创建一个kafka队列的控制台消息消费者,消费的topic主题类型是test,且有一个 消费组为group_test。
-
查看kafka队列中topic的partition的offset信息:
./bin/kafka-consumer-offset-checker.sh --zookeeper master:2181 --topic test --group group_test broker-info复制代码
通过kafka-consumer-offset-checker.sh脚本命令查看topic主题test的group_test消费分组的kafka broker 节点的offset相关信息
Group Topic Pid Offset logSize Lag Ownergroup_hyb_tmp hyb 0 15 15 0 group_hyb_tmp_master-1543144080872-eb93ebf0-0BROKER INFO2 -> 192.168.241.22:9092复制代码
以上就是查看的offset相关信息:
a. Group: 消费者的消费分组id
b. Topic: kafka消息主题
c. Pid: kafka的消息主题的partition id号
d. Offset: partition中维护的消息偏移量首地址索引号大小
e. logSize: kafka队列消息日志的总大小
f. Lag: 消息数据积压个数,如果Lag长时间存在,说明Producer生产消息到kafka队列中长时间未被消费,说明消费速度慢,数据存在积压问题。
g. BROKER INFO: kafka的broker节点信息,2表示节点的myid,后面ip为当前处理消息日志的节点
-
在zookeeper的客户端中查看offset信息:
# 启动zookeeper的客户端./bin/zkCli.sh复制代码
通过
ls /
可查看根目录都有哪些文件夹:[zk: localhost:2181(CONNECTED) 22] ls /[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, config]复制代码
offset信息在consumer中:
ls /consumer[zk: localhost:2181(CONNECTED) 23] ls /consumers[group_hyb, group_hyb_tmp]复制代码
消费者信息中是消费组
[zk: localhost:2181(CONNECTED) 24] ls /consumers/group_hyb_tmp[ids, owners, offsets][zk: localhost:2181(CONNECTED) 25] ls /consumers/group_hyb_tmp/offsets[hyb][zk: localhost:2181(CONNECTED) 26] ls /consumers/group_hyb_tmp/offsets/hyb[0][zk: localhost:2181(CONNECTED) 27] ls /consumers/group_hyb_tmp/offsets/hyb/0[]复制代码
上面可以看到,offsets消费组里面,offset里是topic,topic里面是partition,partition里就没文件夹了。 可以通过以下命令来查看具体offset信息:
get /consumers/group_hyb_tmp/offsets/hyb/0
具体的信息是:16cZxid = 0x4000000a4ctime = Sun Nov 25 19:09:00 CST 2018mZxid = 0x4000002d5mtime = Sun Nov 25 19:44:00 CST 2018pZxid = 0x4000000a4cversion = 0dataVersion = 2aclVersion = 0ephemeralOwner = 0x0dataLength = 2numChildren = 0复制代码
第一行的16,就是offset的大小信息。