spark之Driver、Executor端代码划分
package sparkStream import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{ Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{ DStream, ReceiverInputDStream} object DriverAndExecutorCode { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(this.getClass.getName) .master("local[2]") .getOrCreate() val sc = spark.sparkContext val ssc = new StreamingContext(sc, Seconds(5)) val inputDstream: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.226.88", 6666) //TODO 代码1 --> Driver端执行 1次 inputDstream.transform { rdd => { //TODO 代码2 --> Driver端执行 (每采集一次数据,执行一次。如:从redis读取黑名单可以在这里处理) rdd.map { //RDD的算子中传入函数在Driver端执行 case line => { //TODO 代码3 --> Executor端执行 (不同的Executor都会执行这段代码) } } } } inputDstream.map { case line => { //TODO 代码4 --> Executor端执行 (不同的Executor都会执行这段代码) } } //数据库连接位置选择 //1.不能在Driver端,因为连接不能序列化,传入到Executor端 //2.foreach中创建,造成资源浪费 //3.增加foreachPartition,在分区创建 inputDstream.foreachRDD { rdd => { rdd.foreachPartition { iter => { //TODO 这里创建数据库连接,如redis、mysql等 iter.foreach { x => { //TODO 写入数据库 } } //TODO 关闭连接 } } } } ssc.start() ssc.awaitTermination() } }
使用kryo序列化
上一篇:
IDEA上Java项目控制台中文乱码