消息队列——kafka——搭建单机(linux)

1、配置jdk环境:

下载1.8及以上版本的jdk,配置好jdk环境变量。

2、安装zookeeper:

(1)软件下载:

下载地址:

(2)解压软件包:

将下载的软件包放到安装路径下进行解压。

(3)配置文件重命名:

将解压后的文件夹中config内zoo_sample.cfg重命名成zoo.cfg

(4)修改配置参数:

配置文件为解压路径下config/zoo.cfg

A、数据存储路径:

参数:dataDir,配置为zookeeper数据存储路径。

B、服务端口号:

参数:clientPort,值为连接zookeeper服务的端口号。

(5)配置环境变量:

配置环境变量ZOOKEEPER_HOME,值为zookeeper软件解压路径。然后在path中添加%ZOOKEEPER_HOME%/bin;

(6)启动zookeeper:

执行命令:./zkServer.sh start;

(7)停止zookeeper:

执行命令:./zkServer.sh stop;

(8)查看zookeeper:

执行命令:./zkServer.sh status;

3、安装kafka:

(1)软件下载:

下载地址:

(2)解压软件包:

将下载的软件包放置到安装路径下并进行解压。

(3)修改配置参数:

配置文件为解压路径下config/server.properties

A、唯一标识符:

参数:broker.id,值为一个integer类型数字,单机默认不修改即可,如果是集群此参数保证唯一即可。

B、监听地址:

参数:listeners,值为kafka访问地址。

例如:listeners=PLAINTEXT://192.168.1.140:9092(修改IP和端口号)

C、日志存储路径:

参数:log.dirs,值为日志存储路径。

D、Zookeeper访问地址:

参数:zookeeper.connect,值为zookeeper访问地址。如果zookeeper为集群那么集群各节点访问地址中间用英文逗号隔开。

(4)启动kafka:

A、界面启动:

进入kafka安装路径下bin中,执行命令:

./kafka-server-start.sh ../config/server.properties

B、后台启动:

进入kafka安装路径下bin中,执行命令:

./kafka-server-start.sh ../config/server.properties &

(5)停止kafka:

进入kafka安装路径下bin中,执行命令:

./kafka-server-stop.sh

4、测试kafka:

(1)创建一个topic:

进入kafka安装路径下bin中,执行命令:

./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

非常注意:Kafka 从 2.2 版本开始将 kafka-topic.sh 脚本中的 −−zookeeper 参数标注为 “过时”,推荐使用 −−bootstrap-server 参数。若读者依旧使用的是 2.1 及以下版本,请将下述的 --bootstrap-server 参数及其值手动替换为 --zookeeper zk1:2181,zk2:2181,zk:2181。一定要注意两者参数值所指向的集群地址是不同的。−−bootstrap-server参数指向kafka单机或集群,而--zookeeper参数指向zookeeper单机或集群。

(2)打开一个producer:

进入kafka安装路径下bin中,执行命令:

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

(3)打开一个consumer:

进入kafka安装路径下bin中,执行命令:

./kafka-console-consumer.sh--bootstrap-server localhost:9092 --topic test --from-beginning

(4)测试kafka消息:

然后在producer窗口写内容,就会在该topic中产生一条消息,在consumer窗口中看到该topic中新增加的消息,就算是消费了该topic中的一条消息。

经验分享 程序员 微信小程序 职场和发展