快捷搜索: 王者荣耀 脱发

SparkStreaming实时单词统计WordCont,netcat-win32工具使用

工具导入

这里我们需要用到一个工具,netcat-win32用于模拟端口发送数据 百度网盘下载 链接:https://pan.baidu.com/s/1iiet53Rki78GaEIkpp9bnA 提取码:hzug 下载解压后有如下几个文件: 这里注意,有好多人解压出来没有nc.exe和nc64.exe两个文件,如果遇到这种情况的删除整个文件夹,关闭杀毒软件,重新解压一次就解决整个问题了

配置环境变量

netcat-win32工具需要配置系统环境变量

  1. 打开系统变量path编辑
  2. 新建一个,把我们刚才解压的路径复制过来
  3. 打开win+R检查一下,输入nc -lp 9999 出现·光标在等待就说明成功了

代码编写

package com.niit.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{
          
   Seconds, StreamingContext}


/**
 * @Author YanTianCheng
 * @Date 2023/3/27 10:52
 * @Title: Spark_Streaming_WordCont
 * @Package com.niit.streaming
 */
object Spark_Streaming_WordCont {
          
   
  def main(args: Array[String]): Unit = {
          
   
    //创建环境对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
    //初始化SparkStreamingContext            采集周期3秒
    val ssc = new StreamingContext(sparkConf,Seconds(3))

    //获取端口数据
    val lines = ssc.socketTextStream("localhost",9999)
    //将每一行数据分析,形成一个个单词
    val words = lines.flatMap(_.split(" "))
    //将单词转换(映射)元组
    val wordOne = words.map((_,1))
    //统计单词
    val wordCount = wordOne.reduceByKey(_+_)
    //打印
    wordCount.print()


    //启动采集器SparkStreamingContext
    ssc.start()
    //等待采集器关闭
    ssc.awaitTermination()

  }

}

在这个应用程序中,我们首先创建了一个Spark Streaming Context,并设置每5秒一个batch。然后,我们创建一个socket stream,连接localhost:9999。接下来,我们将每行文本划分为单词,并将每个单词计数为1。最后,我们计算每5秒内的单词总数,然后打印出来。

启动该应用程序后,您可以使用nc命令(例如nc -lk 9999)在另一个终端上启动一个TCP服务器,并向其发送单词,然后看到单词的实时计数。

启动程序

  1. 先打开工具开启9999端口
  2. 启动Spark_Streaming_WordCont程序
  3. 端口输入数据
  4. 输出统计结果

问题解决

  1. 如果输出框特别多东西,我们可以更改采集周期,不过在实际开发中采集周期不允许很长的 改为10秒一周期采集一次
  2. 注意先开启工具,再开始运行程序
经验分享 程序员 微信小程序 职场和发展