kafka入门,生产者自定义分区(六)
1、实现Partitioner接口
package com.longer.producer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * 实现接口Partitioner * 实现3个方法:partition,close,configure * 编写partition方法,返回分区号 */ public class MyPartitioner implements Partitioner { /** 返回信息对应的分区 * @param topic 主题 * @param key 消息的 key * @param keyBytes 消息的 key 序列化后的字节数组 * @param value 消息的 value * @param valueBytes 消息的 value 序列化后的字节数组 * @param cluster 集群元数据可以查看分区信息 * @return */ @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { //获取消息 String msgValue = value.toString(); //创建partition int partition; //判断消息求模 return Integer.valueOf(msgValue) % 3; } //关闭资源 @Override public void close() { } //p配置方法 @Override public void configure(Map<String, ?> map) { } }
2、使用自定义分区器
主要代码properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.longer.producer.MyPartitioner");
package com.longer.producer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Map; import java.util.Properties; public class CustomProducerCallbackPartitions3 { public static void main(String[] args) { //1、创建kafka生产者得配置对象 Properties properties=new Properties(); //2、给kafka配置对象添加配置信息 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092"); //3、key value 序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); //添加自定义分区器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.longer.producer.MyPartitioner"); //4、创建kafka生产者对象 KafkaProducer<String,String> producer=new KafkaProducer<String, String>(properties); for (int i = 0; i < 5; i++) { //指定数据发送到1号分区,key为空(IDEA中,ctrl+p查看参数) producer.send(new ProducerRecord<>("first", String.valueOf(i)), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if(e==null){ System.out.println(String.format("主题:%s,分区:%s",metadata.topic(),metadata.partition())); return; } e.printStackTrace(); } }); } //关闭资源 producer.close(); } }
效果
下一篇:
Spring Bean的配置和使用