Spark Streaming保存到HDFS目录中案例

Spark Streaming代码:

package streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{
          
   Seconds, StreamingContext}

object HDFSWordCount {
          
   
  def main(args: Array[String]): Unit = {
          
   
//    if (args.length < 1 ){
          
   
//      System.err.println("Usage: HdfsWordCount <directory>")
//      System.exit(1)
//    }
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")//.setMaster("local[2]")
    // create the context
    val scc = new StreamingContext(sparkConf,Seconds(2))

    val lines = scc.socketTextStream("master",9999)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map((_,1)).reduceByKey(_+_)
    wordCounts.print()
    wordCounts.saveAsObjectFiles(args(0))
    scc.start()
    scc.awaitTermination()
  }
}

利用maven打包:

mvn clean assembly:assembly

上传到集群后 创建脚本 run_hdfs20.sh :

cd $SPARK_HOME
./bin/spark-submit 
        --class streaming.HDFSWordCount 
        --master yarn-cluster 
        --files $HIVE_HOME/conf/hive-site.xml 
        /usr/local/src/badou_code/streaming/badou_spark_20_test-1.0-SNAPSHOT-jar-with-dependencies.jar 
        hdfs://master:9000/output/log

运行脚本 sh -x run_hdfs20.sh

启动端口命令:nc -lp 9999 随便输出数字字母 结果:

-------------------------------------------
Time: 1612670866000 ms
-------------------------------------------
(,1)
(a,4)

-------------------------------------------
Time: 1612670868000 ms
-------------------------------------------
(aa,1)
(a,4)

hdfs中查询:hadoop fs -ls /output/

drwxr-xr-x   - root supergroup          0 2021-02-06 19:58 /output/log-1612670296000
drwxr-xr-x   - root supergroup          0 2021-02-06 19:58 /output/log-1612670298000
drwxr-xr-x   - root supergroup          0 2021-02-06 19:58 /output/log-1612670300000
drwxr-xr-x   - root supergroup          0 2021-02-06 19:58 /output/log-1612670302000
drwxr-xr-x   - root supergroup          0 2021-02-06 19:58 /output/log-1612670304000
经验分享 程序员 微信小程序 职场和发展