kafka简单部署

原创 2019-01-02 11:03:23 其他 阅读(275)

简介 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。这里简单的介绍下kafka,及单机版的部署;

kafka单机版的部署:

这里使用官网下载的kafka_2.12-1.0.0.tgz包,kafka包内一般自带zookeeper,所以不用再自行下载,如需部署集群,最好单独下载zookeper部署;

1、在服务器上下载解压kafka

#创建目录 cd /opt/kafka/ mkdir kafka_2.12 #创建项目目录cd kafka mkdir kafkalogs #创建kafka消息目录,主要存放kafka消息 #下载软件 wget https://archive.apache.org/dist/kafka/1.0.0/kafka_2.12-1.0.0.tgz #解压软件 tar -zxvf kafka_2.12-1.0.0.tgz

2、进入kafka解压的目录,修改配置文件

zookeeper配置文件:config/zookeeper.properties

增加日志目录(如果是单独的zookeeper程序需要copy目录conf下的zoo_sample.cfg文件命名为zookeeper的默认配置名称zoo.cfg)

dataDir=/opt/kafka/logs/log-zkdata

kafka配置文件:config/server.properties

增加日志目录配置,增加ip访问配置

log.dirs=/opt/kafka/logs/log-kafka #外网访问配置一定要配置此项,且配置外网能访问到的地址 advertised.listeners=PLAINTEXT://your.host:9092

3、启动zookeeper

./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

4、启动kafka

./bin/kafka-server-start.sh -daemon config/server.properties #如果启动kafka报jvm内存错误 #执行之前执行 export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"

5、验证是否启动,可以使用jps来检查,也可以使用命令netstat来检查你配置的kafka端口是否启动

netstat -tunlp|egrep "9092|2181" #zookeeper默认端口2181,kafka默认端口9092,可以再配置文件自行修改

6、创建topic验证是否搭建成功

#创建Topic ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test #参数说明 --replication-factor 1 #复制因子,搭建集群时需要设置此参数为你需要复制的数量 --partitions 1 #创建1个分区 --topic #主题名称为test

7、可以使用图像工具kafka tool来监听查询kafka的一些运行消息等

1546398161266048915.png

8、kafka配置文件的参数说明

broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样,每台服务器的broker.id不能相同 delete.topic.enable=true #允许删除topic advertised.listeners=PLAINTEXT://192.168.13.51:19092 #设置kafka对外访问的ip及端口,注意:如果你需要外网访问kafka的话,此处的ip要使用外网能访问到的ip num.network.threads=3 #这个是borker进行网络处理的线程数 num.io.threads=8 #这个是borker进行I/O处理的线程数 log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录, #如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个 socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能 socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘 socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小 num.partitions=1 #默认的分区数,一个topic默认1个分区数 log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天 message.max.byte=5242880 #消息保存的最大值5M replica.fetch.max.bytes=5242880 #取消息的最大值5M default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务 log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件 log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除 log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能 zookeeper.connect=192.168.13.51:12181,192.168.13.52:12182,192.168.13.51:12183 #设置zookeeper的集群连接端口

总结,此为单机版部署,如需要部署集群,只需要部署多台,注意几点即可:

1、broker.id是kafka在集群中的唯一标识,集群部署时需要区别设置;
2、zookeeper.connect参数配置为集群多个端口;
3、zookeeper集群单独搭建,配置zoo.cfg配置文件,配置server.myid,并且配置的日志目录下建文件不同的myid来标识zookeeper;
4、zookeeper集群配置需要设置server数据传输端口,每台服务器设置相同;

详细的集群搭建下期分享;