Kafka:将消息发送指定分区

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallbackPartitions {

    public static void main(String[] args) throws InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.128:9092");

        // 指定对应的key和value的序列化类型 key.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 创建kafka生产者对象
        // KafkaProducer<K, V> 
        // 泛型K为key一般为String类型 泛型V为传递消息的类型,此处发送字符串用String类型
        // 下列发送数据即为 “” “hello ” 
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {

            // 发送到1号分区
            // 或者是依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0
            kafkaProducer.send(new ProducerRecord<>("first", 1, "hello" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null){
                        System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class CustomProducerCallbackPartitions { public static void main(String[] args) throws InterruptedException { // 0 配置 Properties properties = new Properties(); // 连接集群 bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.128:9092"); // 指定对应的key和value的序列化类型 key.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 1 创建kafka生产者对象 // KafkaProducer // 泛型K为key一般为String类型 泛型V为传递消息的类型,此处发送字符串用String类型 // 下列发送数据即为 “” “hello ” KafkaProducer kafkaProducer = new KafkaProducer<>(properties); // 2 发送数据 for (int i = 0; i < 5; i++) {             // 发送到1号分区             // 或者是依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0 kafkaProducer.send(new ProducerRecord<>("first", 1, "hello" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null){ System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition()); } } }); Thread.sleep(2); } // 3 关闭资源 kafkaProducer.close(); } }
经验分享 程序员 微信小程序 职场和发展