关于使用spark项目案例做olap靠谱么?有没有成熟案例

Spark的成功案例_百度文库
两大类热门资源免费畅读
续费一年阅读会员,立省24元!
Spark的成功案例
上传于||暂无简介
阅读已结束,如果下载本文需要使用0下载券
想免费下载更多文档?
定制HR最喜欢的简历
你可能喜欢大数据计算新贵Spark在腾讯雅虎优酷成功应用解析
发表于 14:50|
来源腾讯大数据|
摘要:MapReduce在实时查询和迭代计算上仍有较大的不足,目前,Spark由于其可伸缩、基于内存计算等特点,且可以直接读写Hadoop上任何格式的数据,逐渐成为大数据处理的新宠,腾讯分享了Spark的原理和应用案例。
【编者按】MapReduce由于其设计上的约束只适合处理离线计算,在实时查询和迭代计算上仍有较大的不足,而随着业务的发展,业界对实时查询和迭代分析有更多的需求,单纯依靠MapReduce框架已经不能满足业务的需求了。Spark由于其可伸缩、基于内存计算等特点,且可以直接读写Hadoop上任何格式的数据,成为满足业务需求的最佳候选者。本文来自腾讯大数据。
免费订阅“CSDN云计算”微信公众号,实时掌握第一手云中消息!
CSDN作为国内最专业的云计算服务平台,提供云计算、大数据、虚拟化、数据中心、OpenStack、CloudStack、Hadoop、Spark、机器学习、智能算法等相关云计算观点,云计算技术,云计算平台,云计算实践,云计算产业资讯等服务。
以下为原文:
Spark作为Apache顶级的开源项目,项目主页见http://spark.apache.org。在迭代计算,交互式查询计算以及批量流计算方面都有相关的子项目,如Shark、Spark
Streaming、MLbase、GraphX、SparkR等。从13年起Spark开始举行了自已的Spark Summit会议,会议网址见http://spark-summit.org。Amplab实验室单独成立了独立公司Databricks来支持Spark的研发。
为了满足挖掘分析与交互式实时查询的计算需求,腾讯大数据使用了Spark平台来支持挖掘分析类计算、交互式实时查询计算以及允许误差范围的快速查询计算,目前腾讯大数据拥有超过200台的Spark集群,并独立维护Spark和Shark分支。Spark集群已稳定运行2年,我们积累了大量的案例和运营经验能力,另外多个业务的大数据查询与分析应用,已在陆续上线并稳定运行。在SQL查询性能方面普遍比MapReduce高出2倍以上,利用内存计算和内存表的特性,性能至少在10倍以上。在迭代计算与挖掘分析方面,精准推荐将小时和天级别的模型训练转变为Spark的分钟级别的训练,同时简洁的编程接口使得算法实现比MR在时间成本和代码量上高出许多。
Spark VS MapReduce
尽管MapReduce适用大多数批处理工作,并且在大数据时代成为企业大数据处理的首选技术,但由于以下几个限制,它对一些场景并不是最优选择:
缺少对迭代计算以及DAG运算的支持
Shuffle过程多次排序和落地,MR之间的数据需要落Hdfs文件系统
Spark在很多方面都弥补了MapReduce的不足,比MapReduce的通用性更好,迭代运算效率更高,作业延迟更低,它的主要优势包括:
提供了一套支持DAG图的分布式并行计算的编程框架,减少多次计算之间中间结果写到Hdfs的开销
提供Cache机制来支持需要反复迭代计算或者多次数据共享,减少数据读取的IO开销
使用多线程池模型来减少task启动开稍,shuffle过程中避免不必要的sort操作以及减少磁盘IO操作
广泛的数据集操作类型
MapReduce由于其设计上的约束只适合处理离线计算,在实时查询和迭代计算上仍有较大的不足,而随着业务的发展,业界对实时查询和迭代分析有更多的需求,单纯依靠MapReduce框架已经不能满足业务的需求了。Spark由于其可伸缩、基于内存计算等特点,且可以直接读写Hadoop上任何格式的数据,成为满足业务需求的最佳候选者。
应用Spark的成功案例
目前大数据在互联网公司主要应用在广告、报表、推荐系统等业务上。在广告业务方面需要大数据做应用分析、效果分析、定向优化等,在推荐系统方面则需要大数据优化相关排名、个性化推荐以及热点点击分析等。
这些应用场景的普遍特点是计算量大、效率要求高。Spark恰恰满足了这些要求,该项目一经推出便受到开源社区的广泛关注和好评。并在近两年内发展成为大数据处理领域最炙手可热的开源项目。
本章将列举国内外应用Spark的成功案例。
广点通是最早使用Spark的应用之一。腾讯大数据精准推荐借助Spark快速迭代的优势,围绕“数据+算法+系统”这套技术方案,实现了在“数据实时采集、算法实时训练、系统实时预测”的全流程实时并行高维算法,最终成功应用于广点通pCTR投放系统上,支持每天上百亿的请求量。
基于日志数据的快速查询系统业务构建于Spark之上的Shark,利用其快速查询以及内存表等优势,承担了日志数据的即席查询工作。在性能方面,普遍比Hive高2-10倍,如果使用内存表的功能,性能将会比Hive快百倍。
Yahoo将Spark用在Audience Expansion中的应用。Audience Expansion是广告中寻找目标用户的一种方法:首先广告者提供一些观看了广告并且购买产品的样本客户,据此进行学习,寻找更多可能转化的用户,对他们定向广告。Yahoo采用的算法是logistic
regression。同时由于有些SQL负载需要更高的服务质量,又加入了专门跑Shark的大内存集群,用于取代商业BI/OLAP工具,承担报表/仪表盘和交互式/即席查询,同时与桌面BI工具对接。目前在Yahoo部署的Spark集群有112台节点,9.2TB内存。
阿里搜索和广告业务,最初使用Mahout或者自己写的MR来解决复杂的机器学习,导致效率低而且代码不易维护。淘宝技术团队使用了Spark来解决多次迭代的机器学习算法、高计算复杂度的算法等。将Spark运用于淘宝的推荐相关算法上,同时还利用Graphx解决了许多生产问题,包括以下计算场景:基于度分布的中枢节点发现、基于最大连通图的社区发现、基于三角形计数的关系衡量、基于随机游走的用户属性传播等。
4. 优酷土豆
优酷土豆在使用Hadoop集群的突出问题主要包括:第一是商业智能BI方面,分析师提交任务之后需要等待很久才得到结果;第二就是大数据量计算,比如进行一些模拟广告投放之时,计算量非常大的同时对效率要求也比较高,最后就是机器学习和图计算的迭代运算也是需要耗费大量资源且速度很慢。
最终发现这些应用场景并不适合在MapReduce里面去处理。通过对比,发现Spark性能比MapReduce提升很多。首先,交互查询响应快,性能比Hadoop提高若干倍;模拟广告投放计算效率高、延迟小(同hadoop比延迟至少降低一个数量级);机器学习、图计算等迭代计算,大大减少了网络传输、数据落地等,极大的提高的计算性能。目前Spark已经广泛使用在优酷土豆的视频推荐(图计算)、广告业务等。
Spark与Shark的原理
1.Spark生态圈
如下图所示为Spark的整个生态圈,最底层为资源管理器,采用Mesos、Yarn等资源管理集群或者Spark自带的Standalone模式,底层存储为文件系统或者其他格式的存储系统如HBase。Spark作为计算框架,为上层多种应用提供服务。Graphx和MLBase提供数据挖掘服务,如图计算和挖掘迭代计算等。Shark提供SQL查询服务,兼容Hive语法,性能比Hive快3-50倍,BlinkDB是一个通过权衡数据精确度来提升查询晌应时间的交互SQL查询引擎,二者都可作为交互式查询使用。Spark
Streaming将流式计算分解成一系列短小的批处理计算,并且提供高可靠和吞吐量服务。
2.Spark基本原理
Spark运行框架如下图所示,首先有集群资源管理服务(Cluster Manager)和运行作业任务的结点(Worker Node),然后就是每个应用的任务控制结点Driver和每个机器节点上有具体任务的执行进程(Executor)。
与MR计算框架相比,Executor有二个优点:一个是多线程来执行具体的任务,而不是像MR那样采用进程模型,减少了任务的启动开稍。二个是Executor上会有一个BlockManager存储模块,类似于KV系统(内存和磁盘共同作为存储设备),当需要迭代多轮时,可以将中间过程的数据先放到这个存储系统上,下次需要时直接读该存储上数据,而不需要读写到hdfs等相关的文件系统里,或者在交互式查询场景下,事先将表Cache到该存储系统上,提高读写IO性能。另外Spark在做Shuffle时,在Groupby,Join等场景下去掉了不必要的Sort操作,相比于MapReduce只有Map和Reduce二种模式,Spark还提供了更加丰富全面的运算操作如filter,groupby,join等。
Spark采用了Scala来编写,在函数表达上Scala有天然的优势,因此在表达复杂的机器学习算法能力比其他语言更强且简单易懂。提供各种操作函数来建立起RDD的DAG计算模型。把每一个操作都看成构建一个RDD来对待,而RDD则表示的是分布在多台机器上的数据集合,并且可以带上各种操作函数。如下图所示:
首先从hdfs文件里读取文本内容构建成一个RDD,然后使用filter()操作来对上次的RDD进行过滤,再使用map()操作取得记录的第一个字段,最后将其cache在内存上,后面就可以对之前cache过的数据做其他的操作。整个过程都将形成一个DAG计算图,每个操作步骤都有容错机制,同时还可以将需要多次使用的数据cache起来,供后续迭代使用。
3.Shark的工作原理
Shark是基于Spark计算框架之上且兼容Hive语法的SQL执行引擎,由于底层的计算采用了Spark,性能比MapReduce的Hive普遍快2倍以上,如果是纯内存计算的SQL,要快5倍以上,当数据全部load在内存的话,将快10倍以上,因此Shark可以作为交互式查询应用服务来使用。
上图就是整个Shark的框架图,与其他的SQL引擎相比,除了基于Spark的特性外,Shark是完全兼容Hive的语法,表结构以及UDF函数等,已有的HiveSql可以直接进行迁移至Shark上。
与Hive相比,Shark的特性如下:
1.以在线服务的方式执行任务,避免任务进程的启动和销毁开稍,通常MapReduce里的每个任务都是启动和关闭进程的方式来运行的,而在Shark中,Server运行后,所有的工作节点也随之启动,随后以常驻服务的形式不断的接受Server发来的任务。
2.Groupby和Join操作不需要Sort工作,当数据量内存能装下时,一边接收数据一边执行计算操作。在Hive中,不管任何操作在Map到Reduce的过程都需要对Key进行Sort操作。
3.对于性能要求更高的表,提供分布式Cache系统将表数据事先Cache至内存中,后续的查询将直接访问内存数据,不再需要磁盘开稍。
4.还有很多Spark的特性,如可以采用Torrent来广播变量和小数据,将执行计划直接传送给Task,DAG过程中的中间数据不需要落地到Hdfs文件系统。
腾讯大数据Spark的概况
腾讯大数据综合了多个业务线的各种需求和特性,目前正在进行以下工作:
1.经过改造和优化的Shark和Spark吸收了TDW平台的功能,如Hive的特有功能:元数据重构,分区优化等,同时可以通过IDE或者洛子调度来直接执行HiveSql查询和定时调度Spark的任务;
2.与Gaia和TDW的底层存储直接兼容,可以直接安全且高效地使用TDW集群上的数据;
3.对Spark底层的使用门槛,资源管理与调度,任务监控以及容灾等多个功能进行完善,并支持快速的迁移和扩容。编辑注:TDW是腾讯内部规模最大的分布式系统,为腾讯的各个产品提供海量数据存储和分析服务,目前该项目已经开源,更多资料可查看。
原文链接:&(责编/魏伟)
推荐阅读相关主题:
CSDN官方微信
扫描二维码,向CSDN吐槽
微信号:CSDNnews
相关热门文章新手园地& & & 硬件问题Linux系统管理Linux网络问题Linux环境编程Linux桌面系统国产LinuxBSD& & & BSD文档中心AIX& & & 新手入门& & & AIX文档中心& & & 资源下载& & & Power高级应用& & & IBM存储AS400Solaris& & & Solaris文档中心HP-UX& & & HP文档中心SCO UNIX& & & SCO文档中心互操作专区IRIXTru64 UNIXMac OS X门户网站运维集群和高可用服务器应用监控和防护虚拟化技术架构设计行业应用和管理服务器及硬件技术& & & 服务器资源下载云计算& & & 云计算文档中心& & & 云计算业界& & & 云计算资源下载存储备份& & & 存储文档中心& & & 存储业界& & & 存储资源下载& & & Symantec技术交流区安全技术网络技术& & & 网络技术文档中心C/C++& & & GUI编程& & & Functional编程内核源码& & & 内核问题移动开发& & & 移动开发技术资料ShellPerlJava& & & Java文档中心PHP& & & php文档中心Python& & & Python文档中心RubyCPU与编译器嵌入式开发驱动开发Web开发VoIP开发技术MySQL& & & MySQL文档中心SybaseOraclePostgreSQLDB2Informix数据仓库与数据挖掘NoSQL技术IT业界新闻与评论IT职业生涯& & & 猎头招聘IT图书与评论& & & CU技术图书大系& & & Linux书友会二手交易下载共享Linux文档专区IT培训与认证& & & 培训交流& & & 认证培训清茶斋投资理财运动地带快乐数码摄影& & & 摄影器材& & & 摄影比赛专区IT爱车族旅游天下站务交流版主会议室博客SNS站务交流区CU活动专区& & & Power活动专区& & & 拍卖交流区频道交流区
白手起家, 积分 11, 距离下一级还需 189 积分
论坛徽章:0
获奖详情:
随着大数据概念的普及以及大数据技术的逐渐成熟,越来越多来自不同领域的大小企业开始拥抱大数据。大数据时代真正即将到来。而选择大数据将遭遇的第一个问题就是技术选型。面对由Hadoop、Spark、Hive、HBase、Storm、Kafka、Flume等等众多开源工具构成的生态系统,技术人员往往感到迷茫。
Spark作为数据处理技术中的新贵,因其性能高、开发效率高、高容错等优点越来越受到技术人员的关注,而且Spark为批处理(Spark Core)、交互式(Spark SQL)、流式处理(Spark Streaming)、机器学习(MLlib)、图计算(GraphX)提供了一个统一的数据处理平台。但是Spark也存在一些缺点,例如还不够稳定、任务调度还不够完善、scala语言学习成本较高等等。关于Spark,你有什么看法呢?
讨论话题:(欢迎大家贴图讨论,能用代码讲话的就别用汉字了)
1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
2. 有人说Spark将来会替代Hadoop,你又怎么看?
讨论时间:日——11月29日
奖品设置:
活动结束后,我们将会选取4位讨论精彩的兄弟,送《Spark快速大数据分析》图书一本。
QQ图片15.jpg (13.67 KB, 下载次数: 39)
15:53 上传
作者:[美] Holden Karau Andy Konwinski
      Patrick Wendell
   [加] Matei Zaharia   
译者:王道远
出版时间:2015年 9 月第 1 版
内 容 提 要:
  本书由Spark开发者及核心成员共同打造,讲解了网络大数据时代应运而生的、能高效迅捷地分析处理数据的工具——Spark,它带领读者快速掌握用Spark收集、计算、简化和保存海量数据的方法,学会交互、迭代和增量式分析,解决分区、数据本地化和自定义序列化等问题。
试读样章:
(2.61 MB, 下载次数: 249)
16:50 上传
点击文件名下载附件
&&nbsp|&&nbsp&&nbsp|&&nbsp&&nbsp|&&nbsp&&nbsp|&&nbsp
小富即安, 积分 4056, 距离下一级还需 944 积分
论坛徽章:78
本帖最后由 yybmsrs 于
18:59 编辑
1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
差不多,shuffle的时候中间数据放到内存,放不下才写文件。同时spark的抽象程度更高,不仅仅只有map和reduce。
2. 有人说Spark将来会替代Hadoop,你又怎么看?
spark是个生态系统,hadoop也是一个生态系统,不存在谁取代谁,只能说spark会取代mr。其他如hdfs,yarn都是
另外,这两个问题实在是没什么可以多说的,虽然我很想要这本书
大富大贵, 积分 12329, 距离下一级还需 7671 积分
论坛徽章:16
1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
这个我赞同,因为spark属于那种内存计算型的架构,是对mapreduce不足的改进。
2. 有人说Spark将来会替代Hadoop,你又怎么看?
我觉得不会,Spark框架的底层存储可以选用HDFS,也可以用其他的。但是Spark 运行的模式里有Standalone,Yarn,Mesos。 其中Yarn也是Hadoop的组件。
而且Hadoop组件很多。不是那么随便可以替代的。。
丰衣足食, 积分 946, 距离下一级还需 54 积分
论坛徽章:7
1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
- 基本同意
- Hadoop,主要用于离线大数据分析;那么Spark主要用于准实时性的大数据分析;而Storm用于实时大数据分析
2. 有人说Spark将来会替代Hadoop,你又怎么看?
- 目前看起来不会,Hadoop是一个体系,一个生态系统,里面包含了很多子系统。而且两者的基础也不完全一样,想完全替代,目前看不出这个趋势 ..
小富即安, 积分 2127, 距离下一级还需 2873 积分
论坛徽章:14
本帖最后由 heguangwu 于
15:44 编辑
1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
& & Spark技术来自超算,主要的核心思想是RDD,通过记录每一个步骤之间的计算关系而不是像MapReduce那样保存实际的数据来实现故障恢复,所以Spark的故障恢复其实是要重算的,其计算过程确实有点像MapReduce,MapReduce在经过一个阶段的MapReduce后会将数据保存到HDFS,相当于做了一次checkpoint,这其实是一个很大的消耗,也是性能的一大瓶颈,但好处是恢复起来非常快,Spark也借鉴了这种思想,中间其实也可以做持久化的RDD,等同于checkpoint,但多数情况下不需要这个步骤,后面可以直接继续跟计算操作,这样看Spark无论从容灾还是计算模型都不是MapReduce,只是借鉴了MapReduce很多思想,所以不能说Spark仅仅是内存版的MapReduce。在改进MapReduce模型上,而Hadoop的其它计算框架如Tez/impala是改进mapreduce的阶段划分的单一性(只有map和reduce),将计算拓扑图做成DAG的方式(这个本质是是借鉴了并行数据库的思想),虽然故障恢复很麻烦,但速度快了不少。我们看一下spark的协议栈,从下图可以看出,spark其实不仅仅是一种简单的计算模型,而是一个通用计算平台,上面开发各种库可以实现不同的技术模型,如流计算、机器学习
spark.png (55.86 KB, 下载次数: 37)
09:42 上传
& &&&而很久之前MapReduce也想做一个通用的计算平台(事实上也有很多人在上面做了一些如机器学习等),但其计算模型的简单低效不适合做成通用平台。从另外一个方面来说,spark也是mapreduce的一种重要补充,而不是简单的替代,毕竟现在内存资源还是比较宝贵的,MapReduce消耗的磁盘资源相对而言廉价多了,而且有些计算并不那么着急,5个小时计算出来和10个小时计算出来结果影响不大这种就适合MR
2. 有人说Spark将来会替代Hadoop,你又怎么看?
& &&&其实我上面也讲述了部分我的观点,现在不是谁替换谁,而是spark是mapreduce的一种重要补充,最后的情况不是one size fit all,而是不同场景会采用不同的计算模型,如要求低时延的会采用storm,对吞吐量要求高的会采用spark stream,重要的实时计算采用spark,而中小数据交互式计算可能会采用Tez,超大规模的非实时的计算还是会采用MapReduce,正如下面这个图一样
hadoop.png (101.88 KB, 下载次数: 37)
10:02 上传
& && &最后贴一个hadoop ecosystem的介绍的ppt,虽然是有点老,但思路非常清晰,个人也比较认同这个观点:
(9.73 MB, 下载次数: 110)
10:03 上传
点击文件名下载附件
稍有积蓄, 积分 218, 距离下一级还需 282 积分
论坛徽章:3
1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
& & 这样说有点简单,spark提供了集群的分布式内存抽象,也就是所说的RDD,spark提供了RDD的两类操作,转换和动作,转换包括map,flatMap,filter,union,sample,join,groupByKey,reduceByKey,sortByKey等等,动作是返回结果,包括collect,reduce,count等,抽象层次更高,功能更多,调用更灵活。所处理的数据都是放在内存中,速度更快。
& & mapreduce则抽象层次比较低,只有map,reduce两个基本功能。
2. 有人说Spark将来会替代Hadoop,你又怎么看?
& & hadoop是一个生态系统,主要包括HDFS,mapredeuce,适合处理海量离线数据,他的分布是基于磁盘和IO的。
& & spark的分布处理是基于内存的,速度更快。
& & spark的出现,解决方案又多了一种选择,spark是可以架在hadoop和yarn上的,hadoop的生态中有很多部分,spark可以替代hadoop的一些功能,二者是可并存的。
家境小康, 积分 1255, 距离下一级还需 745 积分
论坛徽章:27
不错,支持
论坛徽章:49
新东西太多了。
家境小康, 积分 1547, 距离下一级还需 453 积分
论坛徽章:10
1. 有人说Spark就是内存版的MapReduce,对此你怎么看?
& & (1)spark与以MapReduce为出发点的hadoop有几乎同样的效果,绝不是同样的处理方式。以下简单介绍不同之处。& &
& & Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。
Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。
尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。
Screenshot-4.png (364.78 KB, 下载次数: 35)
20:43 上传
& &(2) 对于spark的部署,请参见我的博文:
&&以下是部分介绍:
先从比较简单的说起,所谓的没有ha是指master节点没有ha。
组成cluster的两大元素即Master和Worker。slave worker可以有1到多个,这些worker都处于active状态。
Driver Application可以运行在Cluster之内,也可以在cluster之外运行,先从简单的讲起即Driver Application独立于Cluster。那么这样的整体框架如下图所示,由driver,master和多个slave worker来共同组成整个的运行环境。
步骤1 运行master
$SPARK_HOME/sbin/start_master.sh
在 start_master.sh 中最关键的一句就是
&$sbin&/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
检测Master的jvm进程
22:57 pts/0 00:00:05 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080
Master的日志在$SPARK_HOME/logs目录下
步骤2 运行worker,可以启动多个
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077
worker运行时,需要注册到指定的master url,这里就是spark://localhost:7077.
Master侧收到RegisterWorker通知,其处理代码如下
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =&
& && &logInfo(&Registering worker %s:%d with %d cores, %s RAM&.format(
& && &&&workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { // ignore, don't send response } else if (idToWorker.contains(id)) {
& && &&&sender ! RegisterWorkerFailed(&Duplicate worker ID&quot
& && &} else {
& && &&&val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
& && && & sender, workerUiPort, publicAddress) if (registerWorker(worker)) {
& && && & persistenceEngine.addWorker(worker)
& && && & sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
& && && & schedule()
& && &&&} else {
& && && & val workerAddress = worker.actor.path.address
& && && & logWarning(&Worker registration failed. Attempted to re-register worker at same & + &address: & + workerAddress)
& && && & sender ! RegisterWorkerFailed(&Attempted to re-register worker at same address: & + workerAddress)
步骤3 运行Spark-shell
MASTER=spark://localhost:7077 $SPARK_HOME/bin/spark-shell
spark-shell属于application,有关appliation的运行日志存储在 $SPARK_HOME/works 目录下
spark-shell作为application,在Master侧其处理的分支是RegisterApplication,具体处理代码如下。
case RegisterApplication(description) =& { if (state == RecoveryState.STANDBY) { // ignore, don't send response } else {
& && &&&logInfo(&Registering app & + description.name)
& && &&&val app = createApplication(description, sender)
& && &&&registerApplication(app)
& && &&&logInfo(&Registered app & + description.name + & with ID & + app.id)
& && &&&persistenceEngine.addApplication(app)
& && &&&sender ! RegisteredApplication(app.id, masterUrl)
& && &&&schedule()
每当有新的application注册到master,master都要调度schedule函数将application发送到相应的 worker,在对应的worker启动相应的ExecutorBackend. 具体代码请参考Master.scala中的schedule函数,代码就不再列出。
步骤4 结果检测
/opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080 root
21 23:00 pts/0 00:00:25 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.repl.Main
25 23:02 pts/2 00:00:03 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://localhost:7077 root
34 23:02 pts/2 00:00:04 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:40053/user/CoarseGrainedScheduler 0 localhost 4 akka.tcp://sparkWorker@localhost:53568/user/Worker app-59-0000
从运行的进程之间的关系可以看出,worker和master之间的连接建立完毕之后,如果有新的driver application连接上master,master会要求worker启动相应的ExecutorBackend进程。此后若有什么Task需要运 行,则会运行在这些Executor之上。可以从以下的日志信息得出此结论,当然看源码亦可。
14/05/11 23:02:36 INFO Worker: Asked to launch executor app-59-0000/0 for Spark shell 14/05/11 23:02:36 INFO ExecutorRunner: Launch command: &/opt/java/bin/java& &-cp& &:/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar& &-Xms512M& &-Xmx512M& &org.apache.spark.executor.CoarseGrainedExecutorBackend& &akka.tcp://spark@localhost:40053/user/CoarseGrainedScheduler& &0& &localhost& &4& &akka.tcp://sparkWorker@localhost:53568/user/Worker& &app-59-0000&
worker中启动exectuor的相关源码见worker中的receive函数,相关代码如下
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =& if (masterUrl != activeMasterUrl) {
& && &&&logWarning(&Invalid Master (& + masterUrl + &quot attempted to launch executor.&quot
& && &} else { try {
& && && & logInfo(&Asked to launch executor %s/%d for %s&.format(appId, execId, appDesc.name))
& && && & val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host,
& && && && &appDesc.sparkHome.map(userSparkHome =& new File(userSparkHome)).getOrElse(sparkHome),
& && && && &workDir, akkaUrl, ExecutorState.RUNNING)
& && && & executors(appId + &/& + execId) = manager
& && && & manager.start()
& && && & coresUsed += cores_
& && && & memoryUsed += memory_
& && && & masterLock.synchronized {
& && && && &master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
& && && & }
& && &&&} catch { case e: Exception =& {
& && && && &logError(&Failed to launch exector %s/%d for %s&.format(appId, execId, appDesc.name)) if (executors.contains(appId + &/& + execId)) {
& && && && &&&executors(appId + &/& + execId).kill()
& && && && &&&executors -= appId + &/& + execId
& && && && &}
& && && && &masterLock.synchronized {
& && && && &&&master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
& && && && &}
& && && & }
关于standalone的部署,需要详细研究的源码文件如下所列。
& & deploy/master/Master.scala
& & deploy/worker/worker.scala
& & executor/CoarseGrainedExecutorBackend.scala
查看进程之间的父子关系,请用 &pstree&
使用下图来小结单Master的部署情况。
类的动态加载和反射
在谈部署Driver到Cluster上之前,我们先回顾一下java的一大特性“类的动态加载和反射机制”。本人不是一直写java代码出身,所以好多东西都是边用边学,难免挂一漏万。
所谓的反射,其实就是要解决在运行期实现类的动态加载。
来个简单的例子
public class Demo {&&public Demo() {& &System.out.println(&Hi!"&&}&&@SuppressWarnings(&unchecked&quot&&public static void main(String[] args) throws Exception {& &Class clazz = Class.forName(&test.Demo"& &Demo demo = (Demo) clazz.newInstance();&&}
谈到这里,就自然想到了一个面试题,“谈一谈Class.forName和ClassLoader.loadClass的区别&。说到面试,我总是很没有信心,面试官都很屌的, 。
在cluster中运行Driver Application
上一节之所以写到类的动态加载与反射都是为了谈这一节的内容奠定基础。
将Driver application部署到Cluster中,启动的时序大体如下图所示。
& &&&首先启动Master,然后启动Worker
& & 使用”deploy.Client&将Driver Application提交到Cluster中
./bin/spark-class org.apache.spark.deploy.Client launch [client-options] \
& &[application-options]
& & Master在收到RegisterDriver的请求之后,会发送LaunchDriver给worker,要求worker启动一个Driver的jvm process
& & Driver Application在新生成的JVM进程中运行开始时会注册到master中,发送RegisterApplication给Master
& & Master发送LaunchExecutor给Worker,要求Worker启动执行ExecutorBackend的JVM Process
& & 一当ExecutorBackend启动完毕,Driver Application就可以将任务提交到ExecutorBackend上面执行,即LaunchTask指令
提交侧的代码,详见deploy/Client.scala
driverArgs.cmd match { case &launch& =& // TODO: We could add an env variable here and intercept it in `sc.addJar` that would //& && & truncate filesystem paths similar to what YARN does. For now, we just require //& && & people call `addJar` assuming the jar is in the same directory. val env = Map[String, String]()
& && &&&System.getenv().foreach{case (k, v) =& env(k) = v}
& && &&&val mainClass = &org.apache.spark.deploy.worker.DriverWrapper& val classPathConf = &spark.driver.extraClassPath& val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =&
& && && & cp.split(java.io.File.pathSeparator)
& && &&&val libraryPathConf = &spark.driver.extraLibraryPath& val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =&
& && && & cp.split(java.io.File.pathSeparator)
& && &&&val javaOptionsConf = &spark.driver.extraJavaOptions& val javaOpts = sys.props.get(javaOptionsConf)
& && &&&val command = new Command(mainClass, Seq(&{{WORKER_URL}}&, driverArgs.mainClass) ++
& && && & driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)
& && &&&val driverDescription = new DriverDescription(
& && && & driverArgs.jarUrl,
& && && & driverArgs.memory,
& && && & driverArgs.cores,
& && && & driverArgs.supervise,
& && && & command)
& && &&&masterActor ! RequestSubmitDriver(driverDescription)
从Deploy.client发送出来的消息被谁接收呢?答案比较明显,那就是Master。 Master.scala中的receive函数有专门针对RequestSubmitDriver的处理,具体代码如下
case RequestSubmitDriver(description) =& { if (state != RecoveryState.ALIVE) {
& && &&&val msg = s&Can only accept driver submissions in ALIVE state. Current state: $state.& sender ! SubmitDriverResponse(false, None, msg)
& && &} else {
& && &&&logInfo(&Driver submitted & + mand.mainClass)
& && &&&val driver = createDriver(description)
& && &&&persistenceEngine.addDriver(driver)
& && &&&waitingDrivers += driver
& && &&&drivers.add(driver)
& && &&&schedule() // TODO: It might be good to instead have the submission client poll the master to determine //& && & the current status of the driver. For now it's simply &fire and forget&. sender ! SubmitDriverResponse(true, Some(driver.id),
& && && & s&Driver successfully submitted as ${driver.id}&quot
SparkEnv对于整个Spark的任务来说非常关键,不同的role在创建SparkEnv时传入的参数是不相同的,如Driver和Executor则存在重要区别。
在Executor.scala中,创建SparkEnv的代码如下所示
private val env = { if (!isLocal) {
& && &val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
& && &&&isDriver = false, isLocal = false)
& && &SparkEnv.set(_env)
& && &_env.metricsSystem.registerSource(executorSource)
& && &_env
& & } else {
& && &SparkEnv.get }
Driver Application则会创建SparkContext,在SparkContext创建过程中,比较重要的一步就是生成SparkEnv,其代码如下
private[spark] val env = SparkEnv.create(&&conf,&&&&,&&conf.get(&spark.driver.host&quot,&&conf.get(&spark.driver.port&quot.toInt,&&isDriver = true,&&isLocal = isLocal,&&listenerBus = listenerBus)
&&SparkEnv.set(env)
Standalone模式下HA的实现
Spark在standalone模式下利用zookeeper来实现了HA机制,这里所说的HA是专门针对Master节点的,因为上面所有的分析可以看出Master是整个cluster中唯一可能出现单点失效的节点。
采用zookeeper之后,整个cluster的组成如下图所示。
为了使用zookeeper,Master在启动的时候需要指定如下的参数,修改conf/spark-env.sh, SPARK_DAEMON_JAVA_OPTS中添加如下选项。
System property & & & & Meaning
spark.deploy.recoveryMode & & & & Set to ZOOKEEPER to enable standby Master recovery mode (default: NONE).
spark.deploy.zookeeper.url & & & & The ZooKeeper cluster url (e.g., 192.168.1.100:.1.101:2181).
spark.deploy.zookeeper.dir & & & & The directory in ZooKeeper to store recovery state (default: /spark).
实现HA的原理
zookeeper提供了一个Leader Election机制,利用这个机制,可以实现HA功能,具体请参考 zookeeper recipes
在Spark中没有直接使用zookeeper的api,而是使用了 curator ,curator对zookeeper做了相应的封装,在使用上更为友好。
步步演进讲到在standalone模式下,如何利用zookeeper来实现ha。从中可以看出standalone master一个最主要的任务就是resource management和job scheduling,看到这两个主要功能的时候,您也许会想到这不就是YARN要解决的问题。对了,从本质上来说standalone是yarn的一个 简化版本。
2. 有人说Spark将来会替代Hadoop,你又怎么看?
& & 未来会的,但现在还早。hadoop有些笨重,但历史悠久。许多企业投入精力不会轻易放弃的。
家境小康, 积分 1547, 距离下一级还需 453 积分
论坛徽章:10
Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合,如pageRank、K-Means等算法就非常适合内存迭代计算。Spark整个生态体系正逐渐完善中,GraphX 、 SparkSQL、 SparkStreaming 、 MLlib,等到Spark有了自己的数据仓库后,那就完全能与Hadoop生态体系相媲美。
Spark框架采用函数式编程语言Scala,Scala语言的面向对象、函数式、高并发模型等特点,使得Spark拥有了更高的灵活性及性能。如果你学过java,可能会对scala中的一些新概念表示陌生,如隐式转换、模式匹配、伴生类等,但一旦入门,你会感觉scala语言的简洁与强大。
Spark暂时不会取代Hadoop替代者有以下原因:
两者的侧重点不同,使用场景不同。Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的概念。RDD可以cache到内存中,那么每次对RDD数据集的操作之后的结果,都可以存放到内存中,下一个操作可以直接从内存中输入,省去了MapReduce大量的磁盘IO操作。但是,我们也要看到spark的限制:内存。我认为Hadoop虽然费时,但是在OLAP等大规模数据的应用场景,还是受欢迎的。目前Hadoop涵盖了从数据收集、到分布式存储,再到分布式计算的各个领域,在各领域都有自己独特优势。
作为一种内存的迭代计算框架,Spark适用以下场景:
适用于迭代次数比较多的场景。迭代次数多的机器学习算法等。如pageRank、K-Means等。
Spark On Mesos环境
目前在Spark On Mesos环境中,用户可选择两种调度模式之一运行自己的应用程序
粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。举个例子,比如你提交应用程序时,指定使用5个executor运行你的应用程序,每个executor占用5GB内存和5个CPU,每个executor内部设置了5个slot,则Mesos需要先为executor分配资源并启动它们,之后开始调度任务。另外,在程序运行过程中,mesos的master和slave并不知道executor内部各个task的运行情况,executor直接将任务状态通过内部的通信机制汇报给Driver,从一定程度上可以认为,每个应用程序利用mesos搭建了一个虚拟集群自己使用。
细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。与粗粒度模式一样,应用程序启动时,先会启动executor,但每个executor占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos会为每个executor动态分配资源,每分配一些,便可以运行一个新任务,单个Task运行完之后可以马上释放对应的资源。每个Task会汇报状态给Mesos slave和Mesos Master,便于更加细粒度管理和容错,这种调度模式类似于MapReduce调度模式,每个Task完全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大。
北京皓辰网域网络信息技术有限公司. 版权所有 京ICP证:060528号 北京市公安局海淀分局网监中心备案编号:
广播电视节目制作经营许可证(京) 字第1234号
中国互联网协会会员&&联系我们:
感谢所有关心和支持过ChinaUnix的朋友们
转载本站内容请注明原作者名及出处}

我要回帖

更多关于 spark streaming 案例 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信