Kafka数据同步到kafka的几种方案
背景: 针对老系统重构涉及业务系统众多,全链路部署测试环境耗时较长,于是有了kafka中间层集群之间数据同步的想法。
方案: 1、Kafka自带的镜像工具kafka-mirror-maker.sh
MirrorMaker是Kafka附带的一个用于在Kafka集群之间制作镜像数据的工具。该工具从源集群中消费并生产到目标群集。
step1:编写consumer-test.properties
bootstrap.servers=172.25.10.18:19092 # consumer group id group.id=test-consumer-group
step2:编写producer-test.properties
bootstrap.servers=192.168.12.30:19092,192.168.12.30:29092,192.168.12.30:39092 # specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd compression.type=none
step3:启动kafka-mirror-maker
bin/kafka-mirror-maker.sh --consumer.config config/consumer-test.properties --producer.config config/producer-test.properties --num.streams 8 --whitelist customer.order*
备注:kafka集群之间的borker数不一样可能会出现错误。
2、Flume配置agent
step1:编写配置文件
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers = 172.25.10.18:19092 a1.sources.r1.kafka.topics=customer.logindata a1.sources.r1.kafka.groupId = flume a1.sources.r1.kafka.consumer.timeout.ms = 100 # Describe the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers=192.168.12.30:19092 a1.sinks.k1.kafka.topic=customer.logindate #a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder a1.sinks.k1.kafka.producer.acks=1 a1.sinks.k1.custom.encoding=UTF-8 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
step2:启动agent
bin/flume-ng agent -n a1 -c conf -f conf/flume-calls-kafka.properties -Dflume.root.logger=INFO,console
3、Flink硬编码
flink与flume有点类似,都是通过source获取数据然后通过sink将数据输出到指定环境,起到一个管道的作用。具体代码如下:
//获取运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "172.25.10.18:19092"); properties.setProperty("group.id", "flink"); String topic = "customer.logindata"; DataStreamSource<String> student = env.addSource(new FlinkKafkaConsumer<>( topic, new SimpleStringSchema(), properties)).setParallelism(1); student.print(); Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "192.168.12.30:19092"); prop.setProperty("group.id", "flink"); student.addSink(new FlinkKafkaProducer<String>("customer.logindata1, new SimpleStringSchema(), prop)); env.execute("flink kafka to kafka");