通常在向Spark传递函数的时候,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一个新的副本,更新这些副本的值也不会影响到驱动器中对应变量。Spark的两个共享变量,累加器与广播变量分别为结果聚合与广播这两种常见的通信模式突破了这一限制。
累计器的用法:
1、通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出具有初始值的累加器。返回值为org.apache.spark.Accumulator[T] 对象,其中T 是初始值initialValue 的类型。
2、Spark 闭包里的执行器代码可以使用累加器的+= 方法(在Java 中是add)增加累加器的值。驱动器程序可以调用累加器的value 属性(在Java 中使用value() 或setValue())来访问累加器的值。注意,工作节点上的任务不能访问累加器的值,从这些任务的角度来看,累加器是一个只写的变量。在这种模式下,累加器的实现可以更高效,不需要对每次更新操作进行复杂的通信。
对于要在action操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此为了避免一个无论是在失败还是重复计算时都可靠的累加器,我们必须要把它放在foreach()的操作中。对于在RDD转化操作中使用的累加器,就不能保证有这种情况了。转化操作中累加器可能发生不止一次更新。
广播变量的用法:
1、通过对一个类型T的对象调用SparkContext.broadcast创建一个BroadCast[T]对象,任何可序列化的类型都可以这么实现。
2、通过value属性访问该对象的值(在JAVA中为value()方法)
3、变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)
满足只读要求最容易的使用方式是广播基本类型的值或者引用不可变对象。在这样的情况下,你没有办法修改广播变量的值,除了在驱动器程序的代码中可以修改。但是有时候传一个可变对象可能更为方便与高效,如果你这样做的话,就需要自己维护只读的条件。就像对Array[String] 类型的呼号前缀表所做的那样,必须确保从节点上运行的代码不会尝试去做诸如val theArray = broadcastArray.value; theArray(0) = newValue 这样的事情。当在工作节点上执行时,这一行将newValue 赋给数组的第一个元素,但是只对该工作节点本地的这个数组的副本有效,而不会改变任何其他工作节点上通过broadcastArray.value 所读取到的内容。
基于分区进行操作:
基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作,诸如打开数据库连接或创建随机数生成器等操作。都是我们应当尽量避免为每个元素都配置一次工作。这样可以大量节省相关的资源。