全局变量和局部变量公共变量有什么区别

全局变量是整个程序都可访问的变量,谁都可以访问,而局部变量存在于模块(子程序,函数)中,只有所在模块可以访问,其他模块不可直接访问
全局变量生存期在整个程序从运行到结束(在程序结束时所占内存释放), 局部变量在模块结束(函数调用完毕),局部变量消失,所占据的内存释放
全局变量分配在全局数据段并且在程序开始运行的时候被加载. 局部变量则分配在堆栈里面。
编译器通过变量的分配地址就可以判断出是局部变量和全局变量,操作系统通过语法词法的分析,判断出是全局变量还是局部变量。
操作系统通过变量的分配地址就可以判断出是局部变量和全局变量,编译器通过语法词法的分析,判断出是全局变量还是局部变量。
“编译器通过变量的分配地址就可以判断出是局部变量和全局变量”,本来全局变量和局部变量分配的地址就是编译器进行的,而不是他要去判断
全局变量是整个程序都可访问的变量,谁都可以访问,生存期在整个程序从运行到结束(在程序结束时所占内存释放),而局部变量存在于模块(子程序,函数)中,只有所在模块可以访问,其他模块不可直接访问,模块结束(函数调用完毕),局部变量消失,所占据的内存释放。
全局变量分配在全局数据段并且在程序开始运行的时候被加载. 局部变量则分配在堆栈里面。
这道题你会答吗?花几分钟告诉大家答案吧!
扫描二维码,关注牛客网
下载牛客APP,随时随地刷题
京ICP备号-4
扫一扫,把题目装进口袋他的最新文章
他的热门文章
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)背景:上午看书阅及这两对概念,有很多相似之处,故记之。
一. 局部变量&全局变量
  1 局部变量:“在函数内定义的变量”,
      即在一个函数内部定义的变量,只在本函数范围内有效。
  2 全局变量:“在函数外定义的变量”,
      即从定义变量的位置到本源文件结束都有效。  
      目的:增加函数间数据联系的渠道。由于同一文件中的所有函数都能引用全局变量的值,因此如果在一个函数中改变了全局变量的值,
         就能影响到其他函数中全局变量的值,相当于各个函数之间有直接的传递渠道。
 为了便于区别两者,C程序设计人员有一个习惯,将全局变量名的第一个字母用大写表示。如:float Max=0,Min=0; &//定义全局变量Max,Min
二. 内部函数&外部函数
  1.内部函数:只被本文件中其他函数所调用。
        定义内部函数时,在函数名、函数类型前加 static 。 static 类型名 函数名(形参表); 例如,static int fun(int a,int b);
  2.外部函数:可供其他文件调用。
        定义外部函数时,在函数首部左端加 extern。extern int fun(int a,int b);
        若在定义函数时省略extern,则默认为外部函数。
拓展一个问题,为什么C语言要求在定义所有的变量时都要指定变量的类型?
        所谓类型,就是对数据分配存储单元的安排,包括存储单元的长度(占多少字节)以及数据的存储形式。不同的类型分配不同的长度和存储形式。
      在计算机中,数据是存放在存储单元中的,它是具体存在的(在数学中,数和数的运算都是抽象的)。而且,存储单元是由有限的字节构成的,
      每一个存储单元中存放数据的范围是有限的,不可能存放“无穷大”的数,也不能存放循环小数。
最后附上c语言包含的数据类型
阅读(...) 评论()查看: 33010|回复: 4
spark分布式编程之全局变量专题【共享变量】
主题帖子积分
本帖最后由 pig2 于
16:58 编辑
1.spark共享变量的作用是什么?
2.什么情况下使用共享变量?
3.如何在程序中使用共享变量?
4.广播变量源码包含哪些内容?
spark编程中,我们经常会遇到使用全局变量,来累加或则使用全局变量。然而对于分布式编程这个却与传统编程有着很大的区别。不可能在程序中声明一个全局变量,在分布式编程中就可以直接使用。因为代码会分发到多台机器,导致我们认为的全局变量失效。那么spark,spark Streaming该如何实现全局变量。
一般情况下,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。这些变量被复制到每台机器上,并且这些变量在远程机器上 的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变量(broadcast variable)和累加器(accumulator)+
1.1 广播变量:
广播可以将变量发送到闭包中,被闭包使用。但是,广播还有一个作用是同步较大数据。比如你有一个IP库,可能有几G,在map操作中,依赖这个ip库。那么,可以通过广播将这个ip库传到闭包中,被并行的任务应用。广播通过两个方面提高数据共享效率:
1,集群中每个节点(物理机器)只有一个副本,默认的闭包是每个任务一个副本;
2,广播传输是通过BT下载模式实现的,也就是P2P下载,在集群多的情况下,可以极大的提高数据传输速率。广播变量修改后,不会反馈到其他节点。
1.2 累加器:
累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和总和。Spark原生地只支持数字类型的累加器,编程者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程。(对于Python还不支持)
累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。
2.如何使用全局变量
2.1 Java版本:
[Java] 纯文本查看 复制代码package com.S
import org.apache.spark.A
import org.apache.spark.SparkC
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.F
import org.apache.spark.broadcast.B
import org.apache.spark.streaming.D
import org.apache.spark.streaming.T
import org.apache.spark.streaming.api.java.JavaStreamingC
import org.apache.spark.api.java.function.FlatMapF
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairF
import org.apache.spark.streaming.api.java.JavaDS
import org.apache.spark.streaming.api.java.JavaPairDS
import org.apache.spark.streaming.api.java.JavaReceiverInputDS
import scala.Tuple2;
import java.util.*;
* 利用广播进行黑名单过滤!
* 无论是计数器还是广播!都不是想象的那么简单!
* 联合使用非常强大!!!绝对是高端应用!
* 如果 联合使用扩展的话,该怎么做!!!
public class BroadcastAccumulator {
* 肯定要创建一个广播List
* 在上下文中实例化!
private static volatile Broadcast&List&String&& broadcastList =
* 计数器!
* 在上下文中实例化!
private static volatile Accumulator&Integer& accumulator =
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster(&local[2]&).
setAppName(&WordCountOnlieBroadcast&);
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
* 没有action的话,广播并不会发出去!
* 使用broadcast广播黑名单到每个Executor中!
broadcastList = jsc.sc().broadcast(Arrays.asList(&Hadoop&,&Mahout&,&Hive&));
* 全局计数器!用于统计在线过滤了多少个黑名单!
accumulator = jsc.sparkContext().accumulator(0,&OnlineBlackListCounter&);
JavaReceiverInputDStream&String& lines = jsc.socketTextStream(&Master&, 9999);
* 这里省去flatmap因为名单是一个个的!
JavaPairDStream&String, Integer& pairs = lines.mapToPair(new PairFunction&String, String, Integer&() {
public Tuple2&String, Integer& call(String word) {
return new Tuple2&String, Integer&(word, 1);
JavaPairDStream&String, Integer& wordsCount = pairs.reduceByKey(new Function2&Integer, Integer, Integer&() {
public Integer call(Integer v1, Integer v2) {
return v1 + v2;
* Funtion里面 前几个参数是 入参。
* 后面的出参。
* 体现在call方法里面!
* 这里直接基于RDD进行操作了!
wordsCount.foreach(new Function2&JavaPairRDD&String, Integer&, Time, Void&() {
public Void call(JavaPairRDD&String, Integer& rdd, Time time) throws Exception {
rdd.filter(new Function&Tuple2&String, Integer&, Boolean&() {
public Boolean call(Tuple2&String, Integer& wordPair) throws Exception {
if (broadcastList.value().contains(wordPair._1)) {
* accumulator不应该仅仅用来计数。
* 可以同时写进数据库或者redis中!
accumulator.add(wordPair._2);
* 这里真的希望 广播和计数器执行的话。要进行一个action操作!
}).collect();
System.out.println(&广播器里面的值&+broadcastList.value());
System.out.println(&计时器里面的值&+accumulator.value());
jsc.start();
jsc.awaitTermination();
jsc.close();
2.2 Scala版本
[Scala] 纯文本查看 复制代码package com.Streaming
import java.util
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast
* Created by lxh on .
object BroadcastAccumulatorStreaming {
* 声明一个广播和累加器!
private var broadcastList:Broadcast[List[String]]
private var accumulator:Accumulator[Int] = _
def main(args: Array[String]) {
val sparkConf = new SparkConf().setMaster(&local[4]&).setAppName(&broadcasttest&)
val sc = new SparkContext(sparkConf)
* duration是ms
val ssc = new StreamingContext(sc,Duration(2000))
// broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList(&Hadoop&,&Spark&))
broadcastList = ssc.sparkContext.broadcast(List(&Hadoop&,&Spark&))
accumulator= ssc.sparkContext.accumulator(0,&broadcasttest&)
* 获取数据!
val lines = ssc.socketTextStream(&localhost&,9999)
* 拿到数据后 怎么处理!
* 1.flatmap把行分割成词。
* 2.map把词变成tuple(word,1)
* 3.reducebykey累加value
* (4.sortBykey排名)
* 4.进行过滤。 value是否在累加器中。
* 5.打印显示。
val words = lines.flatMap(line =& line.split(& &))
val wordpair = words.map(word =& (word,1))
wordpair.filter(record =& {broadcastList.value.contains(record._1)})
val pair = wordpair.reduceByKey(_+_)
*这步为什么要先foreachRDD?
* 因为这个pair 是PairDStream&String, Integer&
进行foreachRDD是为了?
pair.foreachRDD(rdd =& {
rdd.filter(record =& {
if (broadcastList.value.contains(record._1)) {
accumulator.add(1)
return true
return false
val filtedpair = pair.filter(record =& {
if (broadcastList.value.contains(record._1)) {
accumulator.add(record._2)
println(&累加器的值&+accumulator.value)
// pair.filter(record =& {broadcastList.value.contains(record._1)})
/* val keypair = pair.map(pair =& (pair._2,pair._1))*/
* 如果DStream自己没有某个算子操作。就通过转化transform!
/* keypair.transform(rdd =& {
rdd.sortByKey(false)//TODO
pair.print()
ssc.start()
ssc.awaitTermination()
补充:除了上面提到的两种外,还有一个闭包的概念,这里补充下
闭包 与广播变量对比
有两种方式将数据从driver节点发送到worker节点:通过 闭包 和通过 广播变量 。闭包是随着task的组装和分发自动进行的,而广播变量则是需要程序猿手动操作的,具体地可以通过如下方式操作广播变量(假设 sc 为 SparkContext 类型的对象, bc 为 Broadcast 类型的对象):
可通过 sc.broadcast(xxx) 创建广播变量。
可在各计算节点中(闭包代码中)通过 bc.value 来引用广播的数据。
bc.unpersist() 可将各executor中缓存的广播变量删除,后续再使用时数据将被重新发送。
bc.destroy() 可将广播变量的数据和元数据一同销毁,销毁之后就不能再使用了。
任务闭包包含了任务所需要的代码和数据,如果一个executor数量小于RDD partition的数量,那么每个executor就会得到多个同样的任务闭包,这通常是低效的。而广播变量则只会将数据发送到每个executor一次,并且可以在多个计算操作中共享该广播变量,而且广播变量使用了类似于p2p形式的非常高效的广播算法,大大提高了效率。另外,广播变量由spark存储管理模块进行管理,并以MEMORY_AND_DISK级别进行持久化存储。
什么时候用闭包自动分发数据?情况有几种:
数据比较小的时候。
数据已在driver程序中可用。典型用例是常量或者配置参数。
什么时候用广播变量分发数据?情况有几种:
数据比较大的时候(实际上,spark支持非常大的广播变量,甚至广播变量中的元素数超过java/scala中Array的最大长度限制(2G,约21.5亿)都是可以的)。
数据是某种分布式计算结果。典型用例是训练模型等中间计算结果。
当数据或者变量很小的时候,我们可以在Spark程序中直接使用它们,而无需使用广播变量。
对于大的广播变量,序列化优化可以大大提高网络传输效率,参见本文序列化优化部分。
3.广播变量(Broadcast)源码分析
本文基于Spark 1.0源码分析,主要探讨广播变量的初始化、创建、读取以及清除。
BroadcastManager类中包含一个BroadcastFactory对象的引用。大部分操作通过调用BroadcastFactory中的方法来实现。
BroadcastFactory是一个Trait,有两个直接子类TorrentBroadcastFactory、HttpBroadcastFactory。这两个子类实现了对HttpBroadcast、TorrentBroadcast的封装,而后面两个又同时集成了Broadcast抽象类。
BroadcastManager的初始化
SparkContext初始化时会创建SparkEnv对象env,这个过程中会调用BroadcastManager的构造方法返回一个对象作为env的成员变量存在:
[Bash shell] 纯文本查看 复制代码val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
构造BroadcastManager对象时会调用initialize方法,主要根据配置初始化broadcastFactory成员变量,并调用其initialize方法。
[Bash shell] 纯文本查看 复制代码
val broadcastFactoryClass =
conf.get(&spark.broadcast.factory&, &org.apache.spark.broadcast.HttpBroadcastFactory&)
broadcastFactory =
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
// Initialize appropriate BroadcastFactory and BroadcastObject
broadcastFactory.initialize(isDriver, conf, securityManager)
两个工厂类的initialize方法都是对其相应实体类的initialize方法的调用,下面分开两个类来看。
HttpBroadcast的initialize方法
[Bash shell] 纯文本查看 复制代码
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
synchronized {
if (!initialized) {
bufferSize = conf.getInt(&spark.buffer.size&, 65536)
compress = conf.getBoolean(&spark.broadcast.compress&, true)
securityManager = securityMgr
if (isDriver) {
createServer(conf)
conf.set(&spark.httpBroadcast.uri&,
serverUri)
serverUri = conf.get(&spark.httpBroadcast.uri&)
cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf)
compressionCodec = CompressionCodec.createCodec(conf)
initialized = true
除了一些变量的初始化外,主要做两件事情,一是createServer(只有在Driver端会做),其次是创建一个MetadataCleaner对象。
createServer
[Bash shell] 纯文本查看 复制代码
private def createServer(conf: SparkConf) {
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
server = new HttpServer(broadcastDir, securityManager)
server.start()
serverUri = server.uri
logInfo(&Broadcast server started at & + serverUri)
首先创建一个存放广播变量的目录,默认是
[Bash shell] 纯文本查看 复制代码conf.get(&spark.local.dir&,
System.getProperty(&java.io.tmpdir&)).split(',')(0)
然后初始化一个HttpServer对象并启动(封装了jetty),启动过程中包括加载资源文件,起端口和线程用来监控请求等。这部分的细节在org.apache.spark.HttpServer类中,此处不做展开。
创建MetadataCleaner对象
一个MetadataCleaner对象包装了一个定时计划Timer,每隔一段时间执行一个回调函数,此处传入的回调函数为cleanup:
[Bash shell] 纯文本查看 复制代码
private def cleanup(cleanupTime: Long) {
val iterator = files.internalMap.entrySet().iterator()
while(iterator.hasNext) {
val entry = iterator.next()
val (file, time) = (entry.getKey, entry.getValue)
if (time & cleanupTime) {
iterator.remove()
deleteBroadcastFile(file)
即清楚存在吵过一定时长的broadcast文件。在时长未设定(默认情况)时,不清除:
[Bash shell] 纯文本查看 复制代码
if (delaySeconds & 0) {
&Starting metadata cleaner for & + name + & with delay of & + delaySeconds + & seconds & +
&and period of & + periodSeconds + & secs&)
timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
TorrentBroadcast的initialize方法
[Bash shell] 纯文本查看 复制代码
def initialize(_isDriver: Boolean, conf: SparkConf) {
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
initialized = true
Torrent在此处没做什么,这也可以看出和Http的区别,Torrent的处理方式就是p2p,去中心化。而Http是中心化服务,需要启动服务来接受请求。
创建broadcast变量
调用SparkContext中的 def broadcast[T: ClassTag](value: T): Broadcast[T]方法来初始化一个广播变量,实现如下:
[Bash shell] 纯文本查看 复制代码
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
即调用broadcastManager的newBroadcast方法:
[Bash shell] 纯文本查看 复制代码
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
再调用工厂类的newBroadcast方法,此处返回的是一个Broadcast对象。
HttpBroadcastFactory的newBroadcast
[Bash shell] 纯文本查看 复制代码
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new HttpBroadcast[T](value_, isLocal, id)
即创建一个新的HttpBroadcast对象并返回。
构造对象时主要做两件事情:
[Bash shell] 纯文本查看 复制代码
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
if (!isLocal) {
HttpBroadcast.write(id, value_)
1.将变量id和值放入blockManager,但并不通知master
2.调用伴生对象的write方法
[Bash shell] 纯文本查看 复制代码
def write(id: Long, value: Any) {
val file = getFile(id)
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
new BufferedOutputStream(new FileOutputStream(file), bufferSize)
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
files += file
write方法将对象值按照指定的压缩、序列化写入指定的文件。这个文件所在的目录即是HttpServer的资源目录,文件名和id的对应关系为:
[Bash shell] 纯文本查看 复制代码
case class BroadcastBlockId(broadcastId: Long, field: String = &&) extends BlockId {
def name = &broadcast_& + broadcastId + (if (field == &&) && else &_& + field)
TorrentBroadcastFactory的newBroadcast方法
[Bash shell] 纯文本查看 复制代码
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new TorrentBroadcast[T](value_, isLocal, id)
同样是创建一个TorrentBroadcast对象,并返回。
[Bash shell] 纯文本查看 复制代码
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
if (!isLocal) {
sendBroadcast()
做两件事情,第一步和Http一样,第二步:
[Bash shell] 纯文本查看 复制代码
def sendBroadcast() {
val tInfo = TorrentBroadcast.blockifyObject(value_)
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
hasBlocks = tInfo.totalBlocks
// Store meta-info
val metaId = BroadcastBlockId(id, &meta&)
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
// Store individual pieces
for (i &- 0 until totalBlocks) {
val pieceId = BroadcastBlockId(id, &piece& + i)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
可以看出,先将元数据信息缓存到blockManager,再将块信息缓存过去。开头可以看到有一个分块动作,是调用伴生对象的blockifyObject方法:
[Bash shell] 纯文本查看 复制代码
def blockifyObject[T](obj: T): TorrentInfo
此方法将对象obj分块(默认块大小为4M),返回一个TorrentInfo对象,第一个参数为一个TorrentBlock对象(包含blockID和block字节数组)、块数量以及obj的字节流总长度。
元数据信息中的blockId为广播变量id+后缀,value为总块数和总字节数。
数据信息是分块缓存,每块的id为广播变量id加后缀及块变好,数据位一个TorrentBlock对象
读取广播变量的值
通过调用bc.value来取得广播变量的值,其主要实现在反序列化方法readObject中
HttpBroadcast的反序列化
[Bash shell] 纯文本查看 复制代码
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(blockId) match {
case Some(x) =& value_ = x.asInstanceOf[T]
case None =& {
logInfo(&Started reading broadcast variable & + id)
val start = System.nanoTime
value_ = HttpBroadcast.read[T](id)
* We cache broadcast data in the BlockManager so that subsequent tasks using it
* do not need to re-fetch. This data is only used locally and no other node
* needs to fetch this block, so we don't notify the master.
SparkEnv.get.blockManager.putSingle(
blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
val time = (System.nanoTime - start) / 1e9
logInfo(&Reading broadcast variable & + id + & took & + time + & s&)
首先查看blockManager中是否已有,如有则直接取值,否则调用伴生对象的read方法进行读取:
[Bash shell] 纯文本查看 复制代码
def read[T: ClassTag](id: Long): T = {
logDebug(&broadcast read server: & +
serverUri + & id: broadcast-& + id)
val url = serverUri + &/& + BroadcastBlockId(id).name
var uc: URLConnection = null
if (securityManager.isAuthenticationEnabled()) {
logDebug(&broadcast security enabled&)
val newuri = Utils.constructURIForAuthentication(new URI(url), securityManager)
uc = newuri.toURL.openConnection()
uc.setAllowUserInteraction(false)
logDebug(&broadcast not using security&)
uc = new URL(url).openConnection()
val in = {
uc.setReadTimeout(httpReadTimeout)
val inputStream = uc.getInputStream
if (compress) {
compressionCodec.compressedInputStream(inputStream)
new BufferedInputStream(inputStream, bufferSize)
val ser = SparkEnv.get.serializer.newInstance()
val serIn = ser.deserializeStream(in)
val obj = serIn.readObject[T]()
serIn.close()
使用serverUri和block id对应的文件名直接开启一个HttpConnection将中心服务器上相应的数据取过来,使用配置的压缩和序列化机制进行解压和反序列化。
这里可以看到,所有需要用到广播变量值的executor都需要去driver上pull广播变量的内容。
取到值后,缓存到blockManager中,以便下次使用。
TorrentBroadcast的反序列化
[Bash shell] 纯文本查看 复制代码
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(broadcastId) match {
case Some(x) =&
value_ = x.asInstanceOf[T]
case None =&
val start = System.nanoTime
logInfo(&Started reading broadcast variable & + id)
// Initialize @transient variables that will receive garbage values from the master.
resetWorkerVariables()
if (receiveBroadcast()) {
value_ = TorrentBroadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
/* Store the merged copy in cache so that the next worker doesn't need to rebuild it.
* This creates a trade-off between memory usage and latency. Storing copy doubles
not storing doubles deserialization cost. Also,
* this does not need to be reported to BlockManagerMaster since other executors
* does not need to access this block (they only need to fetch the chunks,
* which are reported).
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
// Remove arrayOfBlocks from memory once value_ is on local cache
resetWorkerVariables()
logError(&Reading broadcast variable & + id + & failed&)
val time = (System.nanoTime - start) / 1e9
logInfo(&Reading broadcast variable & + id + & took & + time + & s&)
和Http一样,都是先查看blockManager中是否已经缓存,若没有,则调用receiveBroadcast方法:
[Bash shell] 纯文本查看 复制代码
def receiveBroadcast(): Boolean = {
// Receive meta-info about the size of broadcast data,
// the number of chunks it is divided into, etc.
val metaId = BroadcastBlockId(id, &meta&)
var attemptId = 10
while (attemptId & 0 && totalBlocks == -1) {
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(metaId) match {
case Some(x) =&
val tInfo = x.asInstanceOf[TorrentInfo]
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
hasBlocks = 0
case None =&
Thread.sleep(500)
attemptId -= 1
if (totalBlocks == -1) {
return false
* Fetch actual chunks of data. Note that all these chunks are stored in
* the BlockManager and reported to the master, so that other executors
* can find out and pull the chunks from this executor.
val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
for (pid &- recvOrder) {
val pieceId = BroadcastBlockId(id, &piece& + pid)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(pieceId) match {
case Some(x) =&
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
hasBlocks += 1
SparkEnv.get.blockManager.putSingle(
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
case None =&
throw new SparkException(&Failed to get & + pieceId + & of & + broadcastId)
hasBlocks == totalBlocks
和写数据一样,同样是分成两个部分,首先取元数据信息,再根据元数据信息读取实际的block信息。注意这里都是从blockManager中读取的,这里贴出blockManager.getSingle的分析。
调用栈中最后到BlockManager.doGetRemote方法,中间有一条语句:
[Bash shell] 纯文本查看 复制代码
val locations = Random.shuffle(master.getLocations(blockId))
即将存有这个block的节点信息随机打乱,然后使用:
[Bash shell] 纯文本查看 复制代码
val data = BlockManagerWorker.syncGetBlock(
GetBlock(blockId), ConnectionManagerId(loc.host, loc.port))
从这里可以看出,Torrent方法首先将广播变量数据分块,并存到BlockManager中;每个节点需要读取广播变量时,是分块读取,对每一块都读取其位置信息,然后随机选一个存有此块数据的节点进行get;每个节点读取后会将包含的快信息报告给BlockManagerMaster,这样本地节点也成为了这个广播网络中的一个peer。
与Http方式形成鲜明对比,这是一个去中心化的网络,只需要保持一个tracker即可,这就是p2p的思想。
广播变量的清除
广播变量被创建时,紧接着有这样一句代码:
[Bash shell] 纯文本查看 复制代码cleaner.foreach(_.registerBroadcastForCleanup(bc))
cleaner是一个ContextCleaner对象,会将刚刚创建的广播变量注册到其中,调用栈为:
[Bash shell] 纯文本查看 复制代码
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]) {
registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
[Bash shell] 纯文本查看 复制代码
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) {
referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)
等出现广播变量被弱引用时(关于弱引用,可以参考:http://blog.csdn.net/lyfi01/article/details/6415726),则会执行
[Bash shell] 纯文本查看 复制代码
cleaner.foreach(_.start())
start方法中会调用keepCleaning方法,会遍历注册的清理任务(包括RDD、shuffle和broadcast),依次进行清理:
[Bash shell] 纯文本查看 复制代码
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
while (!stopped) {
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
.map(_.asInstanceOf[CleanupTaskWeakReference])
reference.map(_.task).foreach { task =&
logDebug(&Got cleaning task & + task)
referenceBuffer -= reference.get
task match {
case CleanRDD(rddId) =&
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
case CleanShuffle(shuffleId) =&
doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
case CleanBroadcast(broadcastId) =&
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case e: Exception =& logError(&Error in cleaning thread&, e)
doCleanupBroadcast调用以下语句:
[Bash shell] 纯文本查看 复制代码
broadcastManager.unbroadcast(broadcastId, true, blocking)
[Bash shell] 纯文本查看 复制代码
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
每个工厂类调用其对应实体类的伴生对象的unbroadcast方法。
HttpBroadcast中的变量清除
[Bash shell] 纯文本查看 复制代码
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
if (removeFromDriver) {
val file = getFile(id)
files.remove(file)
deleteBroadcastFile(file)
1是删除blockManager中的缓存,2是删除本地持久化的文件
TorrentBroadcast中的变量清除
[Bash shell] 纯文本查看 复制代码
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = synchronized {
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
Broadcast可以使用在executor端多次使用某个数据的场景(比如说字典),Http和Torrent两种方式对应传统的CS访问方式和P2P访问方式,当广播变量较大或者使用较频繁时,采用后者可以减少driver端的压力。
http://blog.csdn.net/asongoficeandfire/article/details/
https://endymecy.gitbooks.io/spa ... ared-variables.html
欢迎加入about云群 、 ,云计算爱好者群,关注
主题帖子积分
中级会员, 积分 708, 距离下一级还需 292 积分
中级会员, 积分 708, 距离下一级还需 292 积分
谢谢了,资料不错,很好很好
主题帖子积分
中级会员, 积分 708, 距离下一级还需 292 积分
中级会员, 积分 708, 距离下一级还需 292 积分
谢谢了,资料不错,很好很好!!!!
主题帖子积分
注册会员, 积分 134, 距离下一级还需 66 积分
注册会员, 积分 134, 距离下一级还需 66 积分
spark集群 怎么就能启用一个应用?
主题帖子积分
新手上路, 积分 49, 距离下一级还需 1 积分
新手上路, 积分 49, 距离下一级还需 1 积分
好东西,学习学习
经常参与各类话题的讨论,发帖内容较有主见
经常帮助其他会员答疑
活跃且尽责职守的版主
站长推荐 /2
会员注册不成功的原因
新手获取积分方法
Powered by}

我要回帖

更多关于 静态变量和全局变量 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信