Spark Streaming保存到HDFS目录中案例
Spark Streaming代码:
package streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{
Seconds, StreamingContext}
object HDFSWordCount {
def main(args: Array[String]): Unit = {
// if (args.length < 1 ){
// System.err.println("Usage: HdfsWordCount <directory>")
// System.exit(1)
// }
val sparkConf = new SparkConf().setAppName("HdfsWordCount")//.setMaster("local[2]")
// create the context
val scc = new StreamingContext(sparkConf,Seconds(2))
val lines = scc.socketTextStream("master",9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map((_,1)).reduceByKey(_+_)
wordCounts.print()
wordCounts.saveAsObjectFiles(args(0))
scc.start()
scc.awaitTermination()
}
}
利用maven打包:
mvn clean assembly:assembly
上传到集群后 创建脚本 run_hdfs20.sh :
cd $SPARK_HOME
./bin/spark-submit
--class streaming.HDFSWordCount
--master yarn-cluster
--files $HIVE_HOME/conf/hive-site.xml
/usr/local/src/badou_code/streaming/badou_spark_20_test-1.0-SNAPSHOT-jar-with-dependencies.jar
hdfs://master:9000/output/log
运行脚本 sh -x run_hdfs20.sh
启动端口命令:nc -lp 9999 随便输出数字字母 结果:
------------------------------------------- Time: 1612670866000 ms ------------------------------------------- (,1) (a,4) ------------------------------------------- Time: 1612670868000 ms ------------------------------------------- (aa,1) (a,4)
hdfs中查询:hadoop fs -ls /output/
drwxr-xr-x - root supergroup 0 2021-02-06 19:58 /output/log-1612670296000 drwxr-xr-x - root supergroup 0 2021-02-06 19:58 /output/log-1612670298000 drwxr-xr-x - root supergroup 0 2021-02-06 19:58 /output/log-1612670300000 drwxr-xr-x - root supergroup 0 2021-02-06 19:58 /output/log-1612670302000 drwxr-xr-x - root supergroup 0 2021-02-06 19:58 /output/log-1612670304000
上一篇:
JS实现多线程数据分片下载
下一篇:
Flink1.4集成Hive3.12指南
