2.12-2.5.1
,2.12
表示Scala的版本,2.5.1
是Kafka软件的主版本。Kafka安装依赖于Java环境,和操作系统版本没有直接关系,所以不对系统发行版进行要求。注意生产环境Kafka服务个数应该大于或等于2个,以提供基本的容错性和高可用能力。
node1
、node2
、node3
。/opt/kafka_2.12-2.5.1
/data/kafka-logs
node1
的ip为10.0.1.3
、node2
的ip为10.0.1.4
、node3
的ip为10.0.1.5
node1:2181,node2:2181,node3:2181
由于Kafka所有节点配置除advertised.listeners
配置项外均相同,所以先在集群中1个节点进行配置,然后将安装文件同步至其他节点后再单独修改这1个配置项即可,因此先在node1
上进行配置,首先解压安装包生成安装目录:
# 在node1上执行
tar -xvzf kafka_2.12-2.5.1.tar.gz -C /opt
cd /opt/kafka_2.12-2.5.1
这样就将Kafka安装到既定的目录:/opt/kafka_2.12-2.5.1
下面。
然后在node1
上编辑配置文件,相对Kafka安装目录为:config/server.properties
,重点对以下配置项修改其余未提及项均默认:
# broker.id=0
broker.id.generation.enable=true
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://{{ broker_ip }}:9092
log.dirs=/data/kafka-logs
num.partitions=3
default.replication.factor=2
num.replica.fetchers=4
offsets.topic.replication.factor=3
log.retention.hours=168
log.segment.bytes=1073741824
zookeeper.connect={{ zookeeper_hosts }}/kafka
broker.id: 正常情况下每个Kafka节点需要配置唯一的broker.id
,就和ZooKeeper的myid类似可以确定当前Kafka在整个Broker中的标识,默认是从0开始自增,比如这里有3个节点,应该分别配置为0,1,2,为了简化配置将broker.id
注释掉,由ZooKeeper自动生成。
broker.id.generation.enable: 这个配置项表示是否开启自动生成broker.id
,默认是true
,但是如果broker.id
配置就以配置的为准,所以我们需要将broker.id
配置项注释掉,这个配置项主要是强调用来自动生成broker.id
,防止被失误修改。那么自动生成的broker.id
是按照配置项reserved.broker.max.id
为基础来递增的,这个配置值默认是1000,所以自动生成的broker.id
是从1001开始的,比如我们这里3个节点就分别是1001,1002,1003这样,之所以这样是为了避免集群中有的配置了broker.id
有的配置了自动生成,从而引起的冲突,正常需要避免这种配置方式。
listeners: 配置Kafka服务监听的地址和端口,默认情况下地址不配置会返回当前的主机名,端口号默认是9092
advertised.listeners: 这个配置默认是注释的,含义是告诉生产者或者消费者主机和端口,如果不设置会使用listeners
配置的值,如果listeners
没有设置同样会自动返回当前的主机名,这里强烈建议配置上实际对外访问的ip,之所以这么写是因为默认情况下会返回主机名,如果此时消费者端没有配置主机就会导致消息无法发送出去,advertised.listeners
和listeners
正常可以配置不同的值,从而适应更多的网络场景。
log.dirs: 配置Kafka使用的数据目录,可以支持配置多个目录并按照逗号,
分割,这样Kafka会将数据均匀分布在多个目录中,从而实现软件级别的容错,这种情况下不需要做RAID也可以。
num.partitions: 配置自动创建的topic分区个数,如果是手动创建分区此配置不生效,例如消费者或生产者操作topic时,如果topic不存在,Kafka Broker会自动创建,这个时候就会使用该配置中的值,建议这个值和我们要部署的节点个数一致,以保证吞吐,如果保持默认值1,数据都会在1个节点上而且吞吐上不去,就会出现问题。
default.replication.factor: 默认创建topic的副本个数,建议除单机部署之外至少配置为2,这样即使任何1个节点故障,也可以正常消费数据,保证数据容错以及服务高可用,这个和上面的num.partitions
一样,是自动创建topic时使用的设置。
num.replica.fetchers: 在Kafka同一个分区的多个副本之间,也存在leader和follower的关系,所有的follower都会从leader实时拷贝消息,消息fetcher的线程数由当前参数控制,默认为1,建议在生产环境中调大从而提升复制的吞吐,当前修改为4
offsets.topic.replication.factor: 正常Kafka消费的偏移量等元数据是保存在Kafka的topic中,偏移状态存储在__consumer_offsets
中,事务状态存储在__transaction_state
,这些元数据占用的空间不大,当前参数用来指定元数据topic的复制因子,对于消费者启动时会先读取元数据中的消费者组的偏移状态,如果此时存在节点故障,无法读取到完整的元数据,那么即使数据可用,也是无法消费的,所以这个值在单机情况下设置为1,如果是两个节点要设置为2,如果是3个节点以上至少要配置为3,从而确保较高的可用性。当前部署了3个节点,所以设置为3
log.retention.hours:数据保留的时间,单位是小时,默认是168小时,也就是7天,超过7天的消息会被删除。
log.segment.bytes: 日志分段的单元大小,超过该大小后会重新写入新的文件,默认为1G。
zookeeper.connect: 配置ZooKeeper的主机列表以及Kafka配置存放的znode,不建议直接使用/
,方便区分不同的组件。
在node1
上,将上面配置中的{{ broker_ip }}
替换为当前节点的实际ip,将{{ zookeeper_hosts }}
替换为实际的ZooKeeper集群,就得到下面的配置:
# broker.id=0
broker.id.generation.enable=true
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://10.0.1.3:9092
log.dirs=/data/kafka-logs
num.partitions=3
default.replication.factor=2
num.replica.fetchers=4
offsets.topic.replication.factor=3
log.retention.hours=168
log.segment.bytes=1073741824
zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka
确认配置无误后,保存文件。
然后继续在node1
上编辑服务文件config/kafka.service
,默认如下:
[Unit]
Description=kafka
After=network.target local-fs.target
[Service]
User=root
Group=root
Type=forking
ExecStart={{ kafka_home }}/bin/kafka-server-start.sh -daemon {{ kafka_home }}/config/server.properties
ExecStop={{ kafka_home }}/bin/kafka-server-stop.sh
TimeoutSec=90s
RestartSec=10s
Restart=always
SuccessExitStatus=143
LimitNOFILE=infinity
LimitNPROC=infinity
LimitCORE=infinity
[Install]
WantedBy=multi-user.target
编写服务文件是为了使用systemd管理Kafka进程,方便服务的启停和运行,手动部署的方式需要将其中的{{ kafka_home}}
模板替换为实际的安装目录/opt/kafka_2.12-2.5.1
,如下:
[Unit]
Description=kafka
After=network.target local-fs.target
[Service]
User=root
Group=root
Type=forking
ExecStart=/opt/kafka_2.12-2.5.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.12-2.5.1/config/server.properties
ExecStop=/opt/kafka_2.12-2.5.1/bin/kafka-server-stop.sh
TimeoutSec=90s
RestartSec=10s
Restart=always
SuccessExitStatus=143
LimitNOFILE=infinity
LimitNPROC=infinity
LimitCORE=infinity
[Install]
WantedBy=multi-user.target
然后保存服务文件。
注意服务文件中使用[Service]
下的LimitNOFILE
限制Kafka所使用的最大文件数,默认按照基础环境配置中的方法配置的最大文件数只能是用户登录后启动程序使用,不会影响systemd,所以在systemd中如果要调整此参数,还需要单独配置才可以。
另外Kafka服务的堆内存默认是1G,当前已经调整至4G,可以在启动脚本bin/kafka-server-start.sh
中调整,建议任何情况下都不要超过6G,Kafka主要使用的是系统的页面缓存,而不是堆内存。
现在相关的配置已经在node1
节点上完成配置,需要将Kafka安装目录同步至其他所有节点:
# 在node1上执行 注意前面的路径/opt/kafka_2.12-2.5.1后面一定不要加'/'
rsync -av /opt/kafka_2.12-2.5.1 node2:/opt
rsync -av /opt/kafka_2.12-2.5.1 node3:/opt
然后分别在node2
和node3
上修改配置文件/opt/kafka_2.12-2.5.1/config/server.properties
中的advertised.listeners
配置项,改为对应节点实际的ip:
# node2上面修改为
advertised.listeners=PLAINTEXT://10.0.1.4:9092
# node3上面修改为
advertised.listeners=PLAINTEXT://10.0.1.5:9092
最后保存配置。
Kafka在启动时如果数据目录不存在会自动创建,所以手动创建数据目录是可选的:
mkdir -p /data/kafka-logs
然后在所有的节点安装服务文件到系统中:
# node1,node2,node3 都需要执行
cp /opt/kafka_2.12-2.5.1/config/kafka.service /usr/lib/systemd/system
为了方便执行命令还可以将Kafka的bin目录添加至环境变量,这一步也是可选的:
export PATH=$PATH:/opt/kafka_2.12-2.5.1/bin
在所有节点上启动Kafka服务:
# 所有节点都需要执行
systemctl start kafka.service
# 查看运行状态
systemctl status kafka.service
启动之后可以确认进程是否存在:
# 查看是否存在Kafka进程
jps
查看Kafka服务日志确认是否正常启动:/opt/kafka_2.12-2.5.1/logs/server.log
然后可以测试下systemd自动拉起进程:找到任意1个节点,手动杀掉Kafka
进程,稍等几秒看是否能正常被systemd再次重启,可以通过查看kafka.service
的系统日志确定拉起的情况:
journalctl -u kafka.service -n 10
最后,如果需要开机启动Kafka服务可以执行:
# 设置开机启动服务
systemctl enable kafka.service