Kafka滚动升级

1.前提准备

  • 安装前确保集群已经完成基础环境配置
  • 旧版本为:zookeeper3.4.14 kafka1.1.1,新版本为:zookeeper3.5.19 kafka2.5.1

2.升级方式

热升级,在不停止集群的情况下完成升级

3.升级流程

将{{ }}按括号中语义替换成相应的内容。

1.保存升级前状态,记录topic、offset等信息以作为对照,Kafka停止生产消费,但无需停止集群。

topic情况

{{ kafka_old_dir }}/bin/kafka-topics.sh --list --zookeeper {{ host }}:{{ zookeeper_port }}

1

group情况

{{ kafka_old_dir }}/bin/kafka-consumer-groups.sh --bootstrap-server {{ host }}:{{ kafka_port }} --list

2

各topic信息

{{ kafka_old_dir }}/bin/kafka-topics.sh --describe --zookeeper {{ host }}:{{ zookeeper_port }} --topic {{ topic_name }}

5

6

7

两个consumer group对应offset信息

{{ kafka_old_dir }}/bin/kafka-consumer-groups.sh --bootstrap-server {{ host }}:{{ kafka_port }} --describe --group {{ group_name }}

8

9

2.部署zookeeper新集群,更改生效配置,但不要启动

zoo.cfg、myid文件可以直接按旧版本配置,但需将zoo.cfg中dataDirdataLogDir修改为新目录(不能与旧版本目录重复)

20

3.逐台进行zookeeper升级

从某台follower节点开始,逐台进行以下升级操作,最后升级leader:

1、关闭旧zookeeper服务

{{ zookeeper_old_dir }}/bin/zkServer.sh stop

2、启动新zookeeper服务

{{ zookeeper_new_dir }}/bin/zkServer.sh start

3、查看新起zookeeper节点是否接入旧集群

{{ zookeeper_new_dir }}/bin/zkServer.sh status

21

4、查看数据同步情况,使用zkCli查看是否有brokers、consumers等kafka相关元数据文件

{{ zookeeper_new_dir }}/bin/zkCli.sh
ls /

22

5、若数据完好,切换为下一台节点,重复操作直至全部切换完成。

4.逐台进行Kafka升级

zookeeper全部升级完成后,进行Kafka升级,依旧使用逐台升级的方式。首先将新版本kafka部署完成,配置沿袭旧版本,但不要启动。而后进行以下操作:

1、停止某台节点的旧版本kafka

{{ kafka_old_dir }}/bin/kafka-server-stop.sh

2、更改新版本kafka配置文件,使protocol version为旧版本(如1.1.1)

在配置文件{{ kafka_new_dir }}/config/server.properties中加入

inter.broker.protocol.version={{ old.version }}

23

3、启动新版本kafka,验证不报错且查询数据、生产消费无误

{{ kafka_new_dir }}/bin/kafka-server-start.sh {{ kafka_new_dir }}/config/server.properties

查看是否报错,若无报错,改为后台启动

{{ kafka_new_dir }}/bin/kafka-server-start.sh -daemon {{ kafka_new_dir }}/config/server.properties

使用第1章中的命令进行数据验证

topic情况

12

group情况

13

各topic信息

16

两个consumer group对应offset信息

14

15

进行试生产、试消费

创建测试test topic

{{ kafka_new_dir }}/bin/kafka-topics.sh --create --topic test --replication-factor 3 --partitions 3 --zookeeper {{ host }}:{{ zookeeper_port }}

启动两个窗口分别启动生产者和消费者进行生产消费测试,查看生产消费是否正常,先启动consumer再启动producer

{{ kafka_new_dir }}/bin/kafka-console-producer.sh --bootstrap-server {{ host }}:{{ kafka_port }} --topic test

24

{{ kafka_new_dir }}/bin/kafka-console-consumer.sh --bootstrap-server {{ host }}:{{ kafka_port }} --topic test

25

4、重复上述操作,依次将每台节点的kafka切换为新版本,完成一轮重启。

5、依次每台节点进行操作:将protocol version改为新版本(如2.5.1)

配置{{ kafka_new_dir }}/config/server.properties

inter.broker.protocol.version={{ new.version }}

26

然后重启Kafka,完成二轮重启。

6、验证数据无误

依然使用第一章的命令,验证数据无误

5.验证数据与升级前无偏差后,完成升级