热升级,在不停止集群的情况下完成升级
将{{ }}按括号中语义替换成相应的内容。
topic情况
{{ kafka_old_dir }}/bin/kafka-topics.sh --list --zookeeper {{ host }}:{{ zookeeper_port }}
group情况
{{ kafka_old_dir }}/bin/kafka-consumer-groups.sh --bootstrap-server {{ host }}:{{ kafka_port }} --list
各topic信息
{{ kafka_old_dir }}/bin/kafka-topics.sh --describe --zookeeper {{ host }}:{{ zookeeper_port }} --topic {{ topic_name }}
两个consumer group对应offset信息
{{ kafka_old_dir }}/bin/kafka-consumer-groups.sh --bootstrap-server {{ host }}:{{ kafka_port }} --describe --group {{ group_name }}
zoo.cfg、myid文件可以直接按旧版本配置,但需将zoo.cfg中dataDir和dataLogDir修改为新目录(不能与旧版本目录重复)
从某台follower节点开始,逐台进行以下升级操作,最后升级leader:
1、关闭旧zookeeper服务
{{ zookeeper_old_dir }}/bin/zkServer.sh stop
2、启动新zookeeper服务
{{ zookeeper_new_dir }}/bin/zkServer.sh start
3、查看新起zookeeper节点是否接入旧集群
{{ zookeeper_new_dir }}/bin/zkServer.sh status
4、查看数据同步情况,使用zkCli查看是否有brokers、consumers等kafka相关元数据文件
{{ zookeeper_new_dir }}/bin/zkCli.sh
ls /
5、若数据完好,切换为下一台节点,重复操作直至全部切换完成。
zookeeper全部升级完成后,进行Kafka升级,依旧使用逐台升级的方式。首先将新版本kafka部署完成,配置沿袭旧版本,但不要启动。而后进行以下操作:
1、停止某台节点的旧版本kafka
{{ kafka_old_dir }}/bin/kafka-server-stop.sh
2、更改新版本kafka配置文件,使protocol version为旧版本(如1.1.1)
在配置文件{{ kafka_new_dir }}/config/server.properties中加入
inter.broker.protocol.version={{ old.version }}
3、启动新版本kafka,验证不报错且查询数据、生产消费无误
{{ kafka_new_dir }}/bin/kafka-server-start.sh {{ kafka_new_dir }}/config/server.properties
查看是否报错,若无报错,改为后台启动
{{ kafka_new_dir }}/bin/kafka-server-start.sh -daemon {{ kafka_new_dir }}/config/server.properties
使用第1章中的命令进行数据验证
topic情况
group情况
各topic信息
两个consumer group对应offset信息
进行试生产、试消费
创建测试test topic
{{ kafka_new_dir }}/bin/kafka-topics.sh --create --topic test --replication-factor 3 --partitions 3 --zookeeper {{ host }}:{{ zookeeper_port }}
启动两个窗口分别启动生产者和消费者进行生产消费测试,查看生产消费是否正常,先启动consumer再启动producer
{{ kafka_new_dir }}/bin/kafka-console-producer.sh --bootstrap-server {{ host }}:{{ kafka_port }} --topic test
{{ kafka_new_dir }}/bin/kafka-console-consumer.sh --bootstrap-server {{ host }}:{{ kafka_port }} --topic test
4、重复上述操作,依次将每台节点的kafka切换为新版本,完成一轮重启。
5、依次每台节点进行操作:将protocol version改为新版本(如2.5.1)
配置{{ kafka_new_dir }}/config/server.properties
inter.broker.protocol.version={{ new.version }}
然后重启Kafka,完成二轮重启。
6、验证数据无误
依然使用第一章的命令,验证数据无误