【Kafka】Docker安装kafka&java kafka api

docker依赖于zookeeper,首先安装zookeeper

一、安装zookeeper

1 拉取镜像

2 创建network

在启动之前,先指定一个网络

docker network create app-tier --driver bridge

3 启动容器

启动zookeeper容器

docker run -d --name zookeeper-server 
--network app-tier 
-p 2181:2181 
-e ALLOW_ANONYMOUS_LOGIN=yes 
bitnami/zookeeper:latest

测试是否成功 进入zookeeper

docker exec -it zookeeper-server /bin/sh

执行代码

zkCli.sh -server 10.249.53.1

二、安装kafka

1 拉取kafka镜像

2 启动kafka容器

docker run -d --name kafka 
-p 9092:9092 
-e KAFKA_BROKER_ID=0 
-e ALLOW_PLAINTEXT_LISTENER=yes 
-e KAFKA_ZOOKEEPER_CONNECT=10.249.53.1:2181 
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.249.53.1:9092 
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 
    bitnami/kafka:latest

进入kafka

docker exec -it kafka /bin/sh

3 创建topic

-- 创建topic

./kafka-topics.sh --bootstrap-server 10.249.53.1:9092 --create  --replication-factor 1 --partitions 1 --topic kfk

查看topic -- 分区topic

./kafka-topics.sh --list --bootstrap-server 10.249.53.1:9092

4 创建生产者

-- 生产者

./kafka-console-producer.sh --broker-list 10.249.53.1:9092 --topic kfk

5 创建消费者

-- 消费者

./kafka-console-consumer.sh --bootstrap-server 10.249.53.1:9092 --topic kfk --from-beginning

三、kafka的java api

1 producer

public class ProducerTest {
          
   
    public static void main(String[] args) throws ExecutionException, InterruptedException {
          
   

        Properties props = new Properties();

        //指定Kafka集群的IP地址和端口号
        props.put("bootstrap.servers",
                "10.249.53.1:9092");

        //指定等待所有副本节点的应答
        props.put("acks","all");

        //指定消息发送最大尝试次数
        props.put("retries",1);

        //指定一批消息处理大小
        props.put("batch.size",16384);

        //指定请求延时
        props.put("linger.ms",1);

        //指定缓存区内存大小
        props.put("buffer.memory",33554432);

        //设置key序列化
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");

        //设置value序列化
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        //生产数据
        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);

        for (int i =0; i < 50; i++){
          
   
            producer.send(new ProducerRecord<String, String>
                    ("kfk",Integer.toString(i),"hello world-" + i)).get();
            try {
          
   
                Thread.sleep(500);
            } catch (InterruptedException e) {
          
   
                e.printStackTrace();
            }
        }

        producer.close();

    }
}

2 消费者

public class ConsumerTest {
          
   
    public static void main(String[] args) {
          
   

        Properties props = new Properties();

        props.put("bootstrap.servers", "10.249.53.1:9092");

        props.put("group.id", "kfk1");

        props.put("enable.auto.commit", "true");

        props.put("auto.commit.interval.ms", "1000");

        props.put("auto.offset.reset","earliest");

        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String,  String> consumer  =  new
                KafkaConsumer<String,  String>(props);

        // 订阅topic
        consumer.subscribe(Arrays.asList("kfk"));



        // 消费数据
        while (true) {
          
   

            ConsumerRecords<String,  String> records  =
                    consumer.poll(100);

            for (ConsumerRecord<String, String> record : records)

                System.out.printf("offset = %d, key = %s, value= %s%n", record.offset(), record.key(), record.value());
        }
    }
}
经验分享 程序员 微信小程序 职场和发展