Kafka:将消息发送指定分区
import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class CustomProducerCallbackPartitions { public static void main(String[] args) throws InterruptedException { // 0 配置 Properties properties = new Properties(); // 连接集群 bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.128:9092"); // 指定对应的key和value的序列化类型 key.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 1 创建kafka生产者对象 // KafkaProducer<K, V> // 泛型K为key一般为String类型 泛型V为传递消息的类型,此处发送字符串用String类型 // 下列发送数据即为 “” “hello ” KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties); // 2 发送数据 for (int i = 0; i < 5; i++) { // 发送到1号分区 // 或者是依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0 kafkaProducer.send(new ProducerRecord<>("first", 1, "hello" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null){ System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition()); } } }); Thread.sleep(2); } // 3 关闭资源 kafkaProducer.close(); } }import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; public class CustomProducerCallbackPartitions { public static void main(String[] args) throws InterruptedException { // 0 配置 Properties properties = new Properties(); // 连接集群 bootstrap.servers properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.128:9092"); // 指定对应的key和value的序列化类型 key.serializer properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 1 创建kafka生产者对象 // KafkaProducer
下一篇:
Consul 基本命令以及集群创建