Spark Streaming保存到HDFS目录中案例
Spark Streaming代码:
package streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{ Seconds, StreamingContext} object HDFSWordCount { def main(args: Array[String]): Unit = { // if (args.length < 1 ){ // System.err.println("Usage: HdfsWordCount <directory>") // System.exit(1) // } val sparkConf = new SparkConf().setAppName("HdfsWordCount")//.setMaster("local[2]") // create the context val scc = new StreamingContext(sparkConf,Seconds(2)) val lines = scc.socketTextStream("master",9999) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map((_,1)).reduceByKey(_+_) wordCounts.print() wordCounts.saveAsObjectFiles(args(0)) scc.start() scc.awaitTermination() } }
利用maven打包:
mvn clean assembly:assembly
上传到集群后 创建脚本 run_hdfs20.sh :
cd $SPARK_HOME ./bin/spark-submit --class streaming.HDFSWordCount --master yarn-cluster --files $HIVE_HOME/conf/hive-site.xml /usr/local/src/badou_code/streaming/badou_spark_20_test-1.0-SNAPSHOT-jar-with-dependencies.jar hdfs://master:9000/output/log
运行脚本 sh -x run_hdfs20.sh
启动端口命令:nc -lp 9999 随便输出数字字母 结果:
------------------------------------------- Time: 1612670866000 ms ------------------------------------------- (,1) (a,4) ------------------------------------------- Time: 1612670868000 ms ------------------------------------------- (aa,1) (a,4)
hdfs中查询:hadoop fs -ls /output/
drwxr-xr-x - root supergroup 0 2021-02-06 19:58 /output/log-1612670296000 drwxr-xr-x - root supergroup 0 2021-02-06 19:58 /output/log-1612670298000 drwxr-xr-x - root supergroup 0 2021-02-06 19:58 /output/log-1612670300000 drwxr-xr-x - root supergroup 0 2021-02-06 19:58 /output/log-1612670302000 drwxr-xr-x - root supergroup 0 2021-02-06 19:58 /output/log-1612670304000
上一篇:
JS实现多线程数据分片下载
下一篇:
Flink1.4集成Hive3.12指南