spark streaming读取kafka内容并进行反序列化
import org.apache.spark.sql.ForeachWriter
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.DeserializationFeature
/**
-
将从kafka接收到的数据并使用jackson进行反序列化
*/
object KafkaToPostgresql {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder
.appName(“kafka to postgresql”)
.master(“local[2]”)
.getOrCreate()
import spark.implicits._
val df = spark
.readStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “xx.xx.xx.xx:9092”)
.option(“subscribe”, “topic-name”)
.option(“startingOffsets”,“latest”)
.load()
// 显示kafka报文格式
df.printSchema()
val rowDataset = df.selectExpr(“CAST(value AS STRING)”)
rowDataset.writeStream.foreach(new ForeachWriter{
override def process(record: Row): Unit = {
val mapper = new ObjectMapper()
// 很重要
mapper.registerModule(DefaultScalaModule)