Spark广播变量和累加器(附案例)

广播变量:

    如果使用了广播变量技术,则 Driver 端将共享数据只会发送到每个 Executor 一份。Executor 中的所有 Task 都复用这个对象。 如果不用广播变量技术,则 Driver 端默认会将共享数据分发到每个 Task 中,造成网络分发压力大。甚至导致你在进行RDD持久化到内存时,因内存不足而被迫存到磁盘,增加了磁盘IO,严重降低性能。

广播变量使用方法(Python实现):

    要保证该共享对象是可序列化的。因为跨节点传输的数据都要是可序列化的。 在Driver端将共享对象广播到每个Executor: #2-定义一个列表,装特殊字符 list_v=[",", ".", "!", "#", "$", "%"] #3-将列表从Driver端广播到各个Executor中 bc=sc.broadcast(list_v) 在Executor中获取: list2=bc.value

累加器:

Spark提供的 Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator 只提供了累加的功能,即确提供了多个 task 对一个变量并行操作的功能。但是 task 只能对 Accumulator 进行累加操作,不能读取 Accumulator 的值,只有 Driver 程序可以读取 Accumulator 的值。创建的 Accumulator 变量的值能够在 Spark Web UI 上看到,所以在创建时应尽量为其命名。

Spark内置了三种类型的Accumulator,分别是 LongAccumulator 累加整数型,DoubleAccumulator 累加浮点型,CollectionAccumulator 累加集合元素。

整数累加器使用方法(Python实现) :

    在Driver端定义整数累加器,赋初始值。 acc=sc.accumulator(0) 在Task中每次累加1 acc.add(1) # 或者 acc+=1

PySpark累加器和广播变量的综合案例:

    案例:过滤非单词字符,并对非单词字符进行统计
    Python代码实现: from pyspark import SparkConf, SparkContext, StorageLevel import os,jieba,time,re #指定环境变量 os.environ[SPARK_HOME] = /export/server/spark os.environ[PYSPARK_PYTHON] = /root/anaconda3/bin/python os.environ[PYSPARK_DRIVER_PYTHON] = /root/anaconda3/bin/python if __name__ == __main__: conf = SparkConf().setAppName(text1).setMaster(local[*]) sc = SparkContext(conf=conf) # 2-定义一个列表,装特殊字符 list_v = [",", ".", "!", "#", "$", "%"] # 3-将列表从Driver端广播到各个Executor中 bc = sc.broadcast(list_v) # 4-定义累加器,后续在分布式task中,每遇到特殊字符,就累加1 acc = sc.accumulator(0) # 5-加载文本内容,形成RDD input_rdd=sc.textFile(file:///export/pyworkspace/sz27_spark/pyspark_core/data/accumulator_broadcast_data.txt) # 6-过滤空行 filtered_rdd = input_rdd.filter(lambda line:len(line.strip())>0) # 7-将每行长句子,按空白字符拆分成短字符串 str_rdd = filtered_rdd.flatMap(lambda line:re.split(\s+,line)) # 8-对上面的RDD,过滤,每遇到特殊字符,累加器acc就累加1,并且剔除掉特殊字符,形成的RDD只包含单词 def filter_str(str): global acc #获取特殊字符列表 list2 = bc.value if str in list2: acc.add(1) return False else: return True word_rdd = str_rdd.filter(filter_str) # 9-对单词进行计数, wordcount_rdd = word_rdd.map(lambda word:(word,1)).reduceByKey(lambda x,y:x+y) # 10-打印单词计数结果 print(单词记数结果:,wordcount_rdd.collect()) # 11-打印累加器的值,即特殊字符的个数 print(特殊字符累加器结果:,acc.value) sc.stop()
经验分享 程序员 微信小程序 职场和发展