spark读取文件并返回特定字符总数
package Spark import chapter_05.void import org.apache.avro.SchemaBuilder.array import org.apache.commons.lang3.Functions.call import org.apache.spark.api.java.function.VoidFunction import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Spark03_WordCount { def main(args: Array[String]): Unit ={ //application //spark 框架 //todo建立和spark框架的链接 //JDBC val sparConf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(sparConf) //一行一行的 val lines: RDD[String] = sc.textFile(path = "Datas") //一个一个的 val words: RDD[String] = lines.flatMap(_.split(" ")) //单词合一 val wordandone: RDD[(String, Int)] = words.map{(_, 1)} // shuffle分区内局部聚合,在全局聚合 shffle val reduced: RDD[(String, Int)] = wordandone.reduceByKey((_+_)) val sorted: RDD[(String, Int)] = reduced.sortBy(_._1, false) sorted.foreach(println) println("-------------------") val a = 0 val array = sorted.take(sorted.count().toInt).toArray for (a <- 0 until sorted.count().toInt){ if (array.apply(a.toInt)._1.equals("GET")){ print("GET COUNT IS "+ array.apply(a.toInt)._2) } } println("-------------------") var c = 0; for (a <- 0 until sorted.count().toInt){ if (array.apply(a.toInt)._1.matches("Port.*")){ println("--- "+ array.apply(a.toInt)._1) if (c < array.apply(a.toInt)._1.substring(5).toInt){ c = array.apply(a.toInt)._1.substring(5).toInt println(c) } } } print("最大值"+c) //todo 关闭连接*/ sc.stop() } }
上一篇:
IDEA上Java项目控制台中文乱码