spark streaming读取kafka内容并进行反序列化

import org.apache.spark.sql.ForeachWriter

import com.fasterxml.jackson.module.scala.DefaultScalaModule

import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper

import com.fasterxml.jackson.databind.ObjectMapper

import com.fasterxml.jackson.databind.DeserializationFeature

/**

    将从kafka接收到的数据并使用jackson进行反序列化

*/

object KafkaToPostgresql {

def main(args: Array[String]): Unit = {

val spark: SparkSession = SparkSession.builder

.appName(“kafka to postgresql”)

.master(“local[2]”)

.getOrCreate()

import spark.implicits._

val df = spark

.readStream

.format(“kafka”)

.option(“kafka.bootstrap.servers”, “xx.xx.xx.xx:9092”)

.option(“subscribe”, “topic-name”)

.option(“startingOffsets”,“latest”)

.load()

// 显示kafka报文格式

df.printSchema()

val rowDataset = df.selectExpr(“CAST(value AS STRING)”)

rowDataset.writeStream.foreach(new ForeachWriter{

override def process(record: Row): Unit = {

val mapper = new ObjectMapper()

// 很重要

mapper.registerModule(DefaultScalaModule)

经验分享 程序员 微信小程序 职场和发展