SparkStreaming实时单词统计WordCont,netcat-win32工具使用
工具导入
这里我们需要用到一个工具,netcat-win32用于模拟端口发送数据 百度网盘下载 链接:https://pan.baidu.com/s/1iiet53Rki78GaEIkpp9bnA 提取码:hzug 下载解压后有如下几个文件: 这里注意,有好多人解压出来没有nc.exe和nc64.exe两个文件,如果遇到这种情况的删除整个文件夹,关闭杀毒软件,重新解压一次就解决整个问题了
配置环境变量
netcat-win32工具需要配置系统环境变量
- 打开系统变量path编辑
- 新建一个,把我们刚才解压的路径复制过来
- 打开win+R检查一下,输入nc -lp 9999 出现·光标在等待就说明成功了
代码编写
package com.niit.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{ Seconds, StreamingContext} /** * @Author YanTianCheng * @Date 2023/3/27 10:52 * @Title: Spark_Streaming_WordCont * @Package com.niit.streaming */ object Spark_Streaming_WordCont { def main(args: Array[String]): Unit = { //创建环境对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark") //初始化SparkStreamingContext 采集周期3秒 val ssc = new StreamingContext(sparkConf,Seconds(3)) //获取端口数据 val lines = ssc.socketTextStream("localhost",9999) //将每一行数据分析,形成一个个单词 val words = lines.flatMap(_.split(" ")) //将单词转换(映射)元组 val wordOne = words.map((_,1)) //统计单词 val wordCount = wordOne.reduceByKey(_+_) //打印 wordCount.print() //启动采集器SparkStreamingContext ssc.start() //等待采集器关闭 ssc.awaitTermination() } }
在这个应用程序中,我们首先创建了一个Spark Streaming Context,并设置每5秒一个batch。然后,我们创建一个socket stream,连接localhost:9999。接下来,我们将每行文本划分为单词,并将每个单词计数为1。最后,我们计算每5秒内的单词总数,然后打印出来。
启动该应用程序后,您可以使用nc命令(例如nc -lk 9999)在另一个终端上启动一个TCP服务器,并向其发送单词,然后看到单词的实时计数。
启动程序
- 先打开工具开启9999端口
- 启动Spark_Streaming_WordCont程序
- 端口输入数据
- 输出统计结果
问题解决
- 如果输出框特别多东西,我们可以更改采集周期,不过在实际开发中采集周期不允许很长的 改为10秒一周期采集一次
- 注意先开启工具,再开始运行程序
上一篇:
JS实现多线程数据分片下载
下一篇:
如何在Linux中安装jdk?