【Kafka】Docker安装kafka&java kafka api
docker依赖于zookeeper,首先安装zookeeper
一、安装zookeeper
1 拉取镜像
2 创建network
在启动之前,先指定一个网络
docker network create app-tier --driver bridge
3 启动容器
启动zookeeper容器
docker run -d --name zookeeper-server --network app-tier -p 2181:2181 -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:latest
测试是否成功 进入zookeeper
docker exec -it zookeeper-server /bin/sh
执行代码
zkCli.sh -server 10.249.53.1
二、安装kafka
1 拉取kafka镜像
2 启动kafka容器
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_ZOOKEEPER_CONNECT=10.249.53.1:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.249.53.1:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 bitnami/kafka:latest
进入kafka
docker exec -it kafka /bin/sh
3 创建topic
-- 创建topic
./kafka-topics.sh --bootstrap-server 10.249.53.1:9092 --create --replication-factor 1 --partitions 1 --topic kfk
查看topic -- 分区topic
./kafka-topics.sh --list --bootstrap-server 10.249.53.1:9092
4 创建生产者
-- 生产者
./kafka-console-producer.sh --broker-list 10.249.53.1:9092 --topic kfk
5 创建消费者
-- 消费者
./kafka-console-consumer.sh --bootstrap-server 10.249.53.1:9092 --topic kfk --from-beginning
三、kafka的java api
1 producer
public class ProducerTest { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); //指定Kafka集群的IP地址和端口号 props.put("bootstrap.servers", "10.249.53.1:9092"); //指定等待所有副本节点的应答 props.put("acks","all"); //指定消息发送最大尝试次数 props.put("retries",1); //指定一批消息处理大小 props.put("batch.size",16384); //指定请求延时 props.put("linger.ms",1); //指定缓存区内存大小 props.put("buffer.memory",33554432); //设置key序列化 props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); //设置value序列化 props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); //生产数据 KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props); for (int i =0; i < 50; i++){ producer.send(new ProducerRecord<String, String> ("kfk",Integer.toString(i),"hello world-" + i)).get(); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } producer.close(); } }
2 消费者
public class ConsumerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "10.249.53.1:9092"); props.put("group.id", "kfk1"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset","earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); // 订阅topic consumer.subscribe(Arrays.asList("kfk")); // 消费数据 while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value()); } } }