Flink1.9.1,scala2.12连接kafka2.11_2.40实例
1.添加相关依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.9.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.9.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.9.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.9.1</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.13</artifactId> <version>2.4.0</version> </dependency>
2.创建scala类,并开发代码
package com.vincer import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer // flatMap和Map需要引用的隐式转换 import org.apache.flink.api.scala._ /** * @Package com.vincer * @ClassName conkafka * @Author Vincer * @Date 2020/01/13 22:31 * @ProjectName kafka-flink */ object conkafka { def main(args: Array[String]): Unit = { val kafkaProps = new Properties() //kafka的一些属性 kafkaProps.setProperty("bootstrap.servers", "hadoop100:9092") //所在的消费组 kafkaProps.setProperty("group.id", "group_test") //获取当前的执行环境 val evn = StreamExecutionEnvironment.getExecutionEnvironment //kafka的consumer,test1是要消费的topic val kafkaSource = new FlinkKafkaConsumer[String]("test1", new SimpleStringSchema, kafkaProps) //设置从最新的offset开始消费 //kafkaSource.setStartFromLatest() //自动提交offset kafkaSource.setCommitOffsetsOnCheckpoints(true) //flink的checkpoint的时间间隔 evn.enableCheckpointing(5000) //添加consumer val stream = evn.addSource(kafkaSource) stream.setParallelism(3) val text = stream text.print() //启动执行 evn.execute("kafkawd") } }
3.启动zookeeper,kafka
(过程免)
4.启动kafka的Client生产数据
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test1
5.运行代码
6.在kafka上生产数据,打印到IDEA的控制台
上一篇:
JS实现多线程数据分片下载
下一篇:
flink的yarn集群方式(1)