Spark SQL到底shell 支持的循环语句什么SQL语句

整理对Spark SQL的理解
我的图书馆
整理对Spark SQL的理解
2382人阅读
CatalystCatalyst是与Spark解耦的一个独立库,是一个impl-free的执行计划的生成和优化框架。目前与Spark Core还是耦合的,对此user邮件组里有人对此提出疑问,见。&以下是Catalyst较早时候的架构图,展示的是代码结构和处理流程。Catalyst定位其他系统如果想基于Spark做一些类sql、标准sql甚至其他查询语言的查询,需要基于Catalyst提供的解析器、执行计划树结构、逻辑执行计划的处理规则体系等类体系来实现执行计划的解析、生成、优化、映射工作。对应上图中,主要是左侧的TreeNodelib及中间三次转化过程中涉及到的类结构都是Catalyst提供的。至于右侧物理执行计划映射生成过程,物理执行计划基于成本的优化模型,具体物理算子的执行都由系统自己实现。&Catalyst现状在解析器方面提供的是一个简单的scala写的sql parser,支持语义有限,而且应该是标准sql的。在规则方面,提供的优化规则是比较基础的(和Pig/Hive比没有那么丰富),不过一些优化规则其实是要涉及到具体物理算子的,所以部分规则需要在系统方那自己制定和实现(如spark-sql里的SparkStrategy)。Catalyst也有自己的一套数据类型。下面介绍Catalyst里几套重要的类结构。TreeNode体系TreeNode是Catalyst执行计划表示的数据结构,是一个树结构,具备一些scala collection的操作能力和树遍历能力。这棵树一直在内存里维护,不会dump到磁盘以某种格式的文件存在,且无论在映射逻辑执行计划阶段还是优化逻辑执行计划阶段,树的修改是以替换已有节点的方式进行的。&TreeNode,内部带一个children: Seq[BaseType]表示孩子节点,具备foreach、map、collect等针对节点操作的方法,以及transformDown(默认,前序遍历)、transformUp这样的遍历树上节点,对匹配节点实施变化的方法。提供UnaryNode,BinaryNode, LeafNode三种trait,即非叶子节点允许有一个或两个子节点。TreeNode提供的是范型。TreeNode有两个子类继承体系,QueryPlan和Expression。QueryPlan下面是逻辑和物理执行计划两个体系,前者在Catalyst里有详细实现,后者需要在系统自己实现。Expression是表达式体系,后面章节都会展开介绍。Tree的transformation实现:传入PartialFunction[TreeType,TreeType],如果与操作符匹配,则节点会被结果替换掉,否则节点不会变动。整个过程是对children递归执行的。执行计划表示模型逻辑执行计划QueryPlan继承自TreeNode,内部带一个output: Seq[Attribute],具备transformExpressionDown、transformExpressionUp方法。在Catalyst中,QueryPlan的主要子类体系是LogicalPlan,即逻辑执行计划表示。其物理执行计划表示由使用方实现(spark-sql项目中)。LogicalPlan继承自QueryPlan,内部带一个reference:Set[Attribute],主要方法为resolve(name:String): Option[NamedeExpression],用于分析生成对应的NamedExpression。LogicalPlan有许多具体子类,也分为UnaryNode, BinaryNode, LeafNode三类,具体在org.apache.spark.sql.catalyst.plans.logical路径下。逻辑执行计划实现LeafNode主要子类是Command体系:各command的语义可以从子类名字看出,代表的是系统可以执行的non-query命令,如DDL。&UnaryNode的子类:BinaryNode的子类:物理执行计划另一方面,物理执行计划节点在具体系统里实现,比如spark-sql工程里的SparkPlan继承体系。物理执行计划实现每个子类都要实现execute()方法,大致有以下实现子类(不全)。LeadNode的子类:UnaryNode的子类:BinaryNode的子类:提到物理执行计划,还要提一下Catalyst提供的分区表示模型。执行计划映射Catalyst还提供了一个QueryPlanner[Physical &: TreeNode[PhysicalPlan]]抽象类,需要子类制定一批strategies: Seq[Strategy],其apply方法也是类似根据制定的具体策略来把逻辑执行计划算子映射成物理执行计划算子。由于物理执行计划的节点是在具体系统里实现的,所以QueryPlanner及里面的strategies也需要在具体系统里实现。在spark-sql项目中,SparkStrategies继承了QueryPlanner[SparkPlan],内部制定了LeftSemiJoin, HashJoin,PartialAggregation, BroadcastNestedLoopJoin, CartesianProduct等几种策略,每种策略接受的都是一个LogicalPlan,生成的是Seq[SparkPlan],每个SparkPlan理解为具体RDD的算子操作。比如在BasicOperators这个Strategy里,以match-case匹配的方式处理了很多基本算子(可以一对一直接映射成RDD算子),如下:[java] case&logical.Project(projectList,&child)&=&&&&&execution.Project(projectList,&planLater(child))&::&Nil&&case&logical.Filter(condition,&child)&=&&&&&execution.Filter(condition,&planLater(child))&::&Nil&&case&logical.Aggregate(group,&agg,&child)&=&&&&&execution.Aggregate(partial&=&false,&group,&agg,&planLater(child))(sqlContext)&::&Nil&&case&logical.Sample(fraction,&withReplacement,&seed,&child)&=&&&&&execution.Sample(fraction,&withReplacement,&seed,&planLater(child))&::&Nil&&Expression体系Expression,即表达式,指不需要执行引擎计算,而可以直接计算或处理的节点,包括Cast操作,Projection操作,四则运算,逻辑操作符运算等。具体可以参考org.apache.spark.sql.expressionspackage下的类。Rules体系凡是需要处理执行计划树(Analyze过程,Optimize过程,SparkStrategy过程),实施规则匹配和节点处理的,都需要继承RuleExecutor[TreeType]抽象类。RuleExecutor内部提供了一个Seq[Batch],里面定义的是该RuleExecutor的处理步骤。每个Batch代表着一套规则,配备一个策略,该策略说明了迭代次数(一次还是多次)。[java] protected&case&class&Batch(name:&String,&strategy:&Strategy,&rules:&Rule[TreeType]*)&&Rule[TreeType &: TreeNode[_]]是一个抽象类,子类需要复写apply(plan: TreeType)方法来制定处理逻辑。RuleExecutor的apply(plan: TreeType): TreeType方法会按照batches顺序和batch内的Rules顺序,对传入的plan里的节点迭代处理,处理逻辑为由具体Rule子类实现。Hive相关Hive支持方式Spark SQL对hive的支持是单独的spark-hive项目,对Hive的支持包括HQL查询、hive metaStore信息、hive SerDes、hive UDFs/UDAFs/ UDTFs,类似Shark。只有在HiveContext下通过hive api获得的数据集,才可以使用hql进行查询,其hql的解析依赖的是org.apache.hadoop.hive.ql.parse.ParseDriver类的parse方法,生成Hive AST。&实际上sql和hql,并不是一起支持的。可以理解为hql是独立支持的,能被hql查询的数据集必须读取自hive api。下图中的parquet、json等其他文件支持只发生在sql环境下(SQLContext)。Hive on SparkHive官方提出了。Shark结束之后,拆分为两个方向:Spark SQL里现在对Hive的支持,体现在复用了Hive的meta store数据、hql解析、UDFs、SerDes,在执行DDL和某些简单命令的时候,调的是hive客户端。hql翻译前会处理一些与query主体无关的set, cache, addfile等命令,然后调用ParserDriver翻译hql,并把AST转换成Catalyst的LogicalPlan,后续优化、物理执行计划翻译及执行过程,与Sql一样使用的是Catalyst提供的内容,执行引擎是Spark。在整个结合过程中,ASTNode映射成LogicalPlan是重点。而Hive社区的Hive on Spark会怎样实现,具体参考jira里的。与Shark对比,Shark多依赖了Hive的执行计划相关模块以及CLI。CLI和JDBC部分是Spark SQL后续打算支持的。Shark额外提供的对Table数据行转列、序列化、压缩存内存的模块,也被拿到了Spark Sql的sql工程里。以上说明了Shark与Spark SQL Hive的区别,对Shark这个项目继承性的理解。而Spark SQL Hive与Hive社区 Hive on Spark的区别需要具体参考jira里的设计文档,我也还没有读过。spark-hive工程解析过程HiveQl.parseSql()把hql解析成logicalPlan。解析过程,提取出一些command,包括:2& set key=value2& cache table2& uncache table2& add jar2& add file2& dfs2& source然后由Hive的ParseDriver把hql解析成AST,得到ASTNode,[java] def&getAst(sql:&String):&ASTNode&=&ParseUtils.findRootNonNullToken((new&ParseDriver).parse(sql))&&把Node转化为Catalyst的LogicalPlan,转化逻辑较复杂,也是Sparksql对hql支持的最关键部分。详见HiveQl.nodeToPlan(node: Node):LogicalPlan方法。大致转换逻辑包括:处理TOK_EXPLAIN和TOK_DESCTABLE处理TOK_CREATETABLE,包括创建表时候一系列表的设置TOK_XXX处理TOK_QUERY,包括TOK_SELECT,TOK_WHERE,TOK_GROUPBY,TOK_HAVING,TOK_SORTEDBY,TOK_LIMIT等等,对FROM后面跟的语句进行nodeToRelation处理。处理TOK_UNION对Hive AST树结构和表示不熟悉,所以此处略过。Analyze过程metadata交互Catalog类为HiveMetastoreCatalog,通过hive的conf生成client(org.apache.hadoop.hive.ql.metadata.Hive,用于与MetaStore通信,获得metadata以及进行DDL操作),catalog的lookupRelation方法里面,client.getTable()得到表信息,client.getAddPartitionsForPruner()得到分区信息。udf相关FunctionRegistry类为HiveFunctionRegistry,根据方法名,通过hive的相关类去查询该方法,检查是否具有该方法,是UDF,还是UDAF(aggregation),或是UDTF(table)。这里只做已有udf的查询,不做新方法的include。与Catalyst的Expression对应继承关系如下:Inspector相关HiveInspectors提供了几个映射数据类型和ObjectInspetor子类的方法,包括PrimitiveObjectInspector,ListObjectInspector,MapObjectInspector,StructObjectInspector四种Optimizer过程在做优化前,会尝试对之前生成的逻辑执行计划进行createtabl操作,因为执行的hql可能是“CREATE TABLE XXX”,这部分处理在HiveMetastoreCatalog的CreateTables单例里,继承了Rule[LogicalPlan]。以及PreInsertionCasts处理,也是HiveMetastoreCatalog里的单例,继承了Rule[LogicalPlan]。&之后的optimizer过程同SQLContext里,用的是同一个Catalyst提供的Optimizer类。Planner及执行过程HiveContext继承自SQLContext,其QueryExecution也继承自SQLContext的QueryExecution。后续执行计划优化、物理执行计划翻译、处理及执行过程同SQL的处理逻辑是一致的。&翻译物理执行计划的时候,hive planner里制定了些特定的策略,与SparkPlanner稍有不同。多了Scripts,DataSinks,HiveTableScans和HiveCommandStrategy四种处理物理执行计划的策略(见HiveStrategies)。1.&&Scripts,用于处理那种hive命令行执行脚本的情况。实现方式是使用ProcessBuilder新起一个JVM进程的方式,用”/bin/bash –c scripts”的方式执行脚本并获取输出流数据,转化为Catalyst Row数据格式。2.&&DataSinks,用于把数据写入到Hive表的情况。里面涉及到一些hive读写的数据格式转化、序列化、读配置等工作,最后通过SparkContext的runJob接口,提交作业。3.&&HiveTableScans,用于对hive table进行扫描,支持使用谓词的分区裁剪(Partition pruning predicates are detected and applied)。4.&&HiveCommandStrategy,用于执行native command和describe command。我理解是这种命令是直接调hive客户端单机执行的,因为可能只与meta data打交道。&toRDD: RDD[Row]处理也有少许区别,返回RDD[Row]的时候,对每个元素做了一次拷贝。&SQL CoreSpark SQL的核心是把已有的RDD,带上Schema信息,然后注册成类似sql里的”Table”,对其进行sql查询。这里面主要分两部分,一是生成SchemaRD,二是执行查询。生成SchemaRDD如果是spark-hive项目,那么读取metadata信息作为Schema、读取hdfs上数据的过程交给Hive完成,然后根据这俩部分生成SchemaRDD,在HiveContext下进行hql()查询。&对于Spark SQL来说,数据方面,RDD可以来自任何已有的RDD,也可以来自支持的第三方格式,如json file、parquet file。SQLContext下会把带case class的RDD隐式转化为SchemaRDD[java] implicit&def&createSchemaRDD[A&&:&Product:&TypeTag](rdd:&RDD[A])&=&&new&SchemaRDD(this,&&SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd)))&&ExsitingRdd单例里会反射出case class的attributes,并把RDD的数据转化成Catalyst的GenericRow,最后返回RDD[Row],即一个SchemaRDD。这里的具体转化逻辑可以参考ExsitingRdd的productToRowRdd和convertToCatalyst方法。之后可以进行SchemaRDD提供的注册table操作、针对Schema复写的部分RDD转化操作、DSL操作、saveAs操作等等。&Row和GenericRow是Catalyst里的行表示模型Row用Seq[Any]来表示values,GenericRow是Row的子类,用数组表示values。Row支持数据类型包括Int, Long, Double, Float, Boolean, Short, Byte, String。支持按序数(ordinal)读取某一个列的值。读取前需要做isNullAt(i: Int)的判断。各自都有Mutable类,提供setXXX(i: int, value: Any)修改某序数上的值。层次结构下图大致对比了Pig,Spark SQL,Shark在实现层次上的区别,仅做参考。查询流程SQLContext里对sql的一个解析和执行流程:1.& 第一步parseSql(sql: String),simple sql parser做词法语法解析,生成LogicalPlan。&2.& 第二步analyzer(logicalPlan),把做完词法语法解析的执行计划进行初步分析和映射,目前SQLContext内的Analyzer由Catalyst提供,定义如下:new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive =true)catalog为SimpleCatalog,catalog是用来注册table和查询relation的。而这里的FunctionRegistry不支持lookupFunction方法,所以该analyzer不支持Function注册,即UDF。Analyzer内定义了几批规则:[java] val&batches:&Seq[Batch]&=&Seq(&&&&Batch("MultiInstanceRelations",&Once,&&&&&&NewRelationInstances),&&&&Batch("CaseInsensitiveAttributeReferences",&Once,&&&&&&(if&(caseSensitive)&Nil&else&LowercaseAttributeReferences&::&Nil)&:&_*),&&&&Batch("Resolution",&fixedPoint,&&&&&&ResolveReferences&::&&&&&&ResolveRelations&::&&&&&&NewRelationInstances&::&&&&&&ImplicitGenerate&::&&&&&&StarExpansion&::&&&&&&ResolveFunctions&::&&&&&&GlobalAggregates&::&&&&&&typeCoercionRules&:_*),&&&&Batch("Check&Analysis",&Once,&&&&&&CheckResolution),&&&&Batch("AnalysisOperators",&fixedPoint,&&&&&&EliminateAnalysisOperators)&&)&&3.& 从第二步得到的是初步的logicalPlan,接下来第三步是optimizer(plan)。Optimizer里面也是定义了几批规则,会按序对执行计划进行优化操作。[java] val&batches&=&&&&Batch("Combine&Limits",&FixedPoint(100),&&&&&&CombineLimits)&::&&&&Batch("ConstantFolding",&FixedPoint(100),&&&&&&NullPropagation,&&&&&&ConstantFolding,&&&&&&LikeSimplification,&&&&&&BooleanSimplification,&&&&&&SimplifyFilters,&&&&&&SimplifyCasts,&&&&&&SimplifyCaseConversionExpressions)&::&&&&Batch("Filter&Pushdown",&FixedPoint(100),&&&&&&CombineFilters,&&&&&&PushPredicateThroughProject,&&&&&&PushPredicateThroughJoin,&&&&&&ColumnPruning)&::&Nil&&4.& 优化后的执行计划,还要丢给SparkPlanner处理,里面定义了一些策略,目的是根据逻辑执行计划树生成最后可以执行的物理执行计划树,即得到SparkPlan。[java] val&strategies:&Seq[Strategy]&=&&&&CommandStrategy(self)&::&&&&TakeOrdered&::&&&&PartialAggregation&::&&&&LeftSemiJoin&::&&&&HashJoin&::&&&&InMemoryScans&::&&&&ParquetOperations&::&&&&BasicOperators&::&&&&CartesianProduct&::&&&&BroadcastNestedLoopJoin&::&Nil&&5.& 在最终真正执行物理执行计划前,最后还要进行两次规则,SQLContext里定义这个过程叫prepareForExecution,这个步骤是额外增加的,直接new RuleExecutor[SparkPlan]进行的。[java] val&batches&=&&&&Batch("Add&exchange",&Once,&AddExchange(self))&::&&&&Batch("Prepare&Expressions",&Once,&new&BindReferences[SparkPlan])&::&Nil&&6.& 最后调用SparkPlan的execute()执行计算。这个execute()在每种SparkPlan的实现里定义,一般都会递归调用children的execute()方法,所以会触发整棵Tree的计算。其他特性内存列存储SQLContext下cache/uncache table的时候会调用列存储模块。该模块借鉴自Shark,目的是当把表数据cache在内存的时候做行转列操作,以便压缩。&实现类InMemoryColumnarTableScan类是SparkPlan LeafNode的实现,即是一个物理执行计划。传入一个SparkPlan(确认了的物理执行计)和一个属性序列,内部包含一个行转列、触发计算并cache的过程(且是lazy的)。&ColumnBuilder针对不同的数据类型(boolean, byte, double, float, int, long, short, string)由不同的子类把数据写到ByteBuffer里,即包装Row的每个field,生成Columns。与其对应的ColumnAccessor是访问column,将其转回Row。&CompressibleColumnBuilder和CompressibleColumnAccessor是带压缩的行列转换builder,其ByteBuffer内部存储结构如下[java] *&&&&.---------------------------&Column&type&ID&(4&bytes)&&*&&&&|&&&.-----------------------&Null&count&N&(4&bytes)&&*&&&&|&&&|&&&.-------------------&Null&positions&(4&x&N&bytes,&empty&if&null&count&is&zero)&&*&&&&|&&&|&&&|&&&&&.-------------&Compression&scheme&ID&(4&bytes)&&*&&&&|&&&|&&&|&&&&&|&&&.---------&Compressed&non-null&elements&&*&&&&V&&&V&&&V&&&&&V&&&V&&*&&&+---+---+-----+---+---------+&&*&&&|&&&|&&&|&...&|&&&|&...&...&|&&*&&+---+---+-----+---+---------+&&*&&\-----------/&\-----------/&&*&&&&&&&header&&&&&&&&&body&&CompressionScheme子类是不同的压缩实现都是scala实现的,未借助第三方库。不同的实现,指定了支持的column data类型。在build()的时候,会比较每种压缩,选择压缩率最小的(若仍大于0.8就不压缩了)。这里的估算逻辑,来自子类实现的gatherCompressibilityStats方法。Cache逻辑cache之前,需要先把本次cache的table的物理执行计划生成出来。在cache这个过程里,InMemoryColumnarTableScan并没有触发执行,但是生成了以InMemoryColumnarTableScan为物理执行计划的SparkLogicalPlan,并存成table的plan。其实在cache的时候,首先去catalog里寻找这个table的信息和table的执行计划,然后会进行执行(执行到物理执行计划生成),然后把这个table再放回catalog里维护起来,这个时候的执行计划已经是最终要执行的物理执行计划了。但是此时Columner模块相关的转换等操作都是没有触发的。真正的触发还是在execute()的时候,同其他SparkPlan的execute()方法触发场景是一样的。Uncache逻辑UncacheTable的时候,除了删除catalog里的table信息之外,还调用了InMemoryColumnarTableScan的cacheColumnBuffers方法,得到RDD集合,并进行了unpersist()操作。cacheColumnBuffers主要做了把RDD每个partition里的ROW的每个Field存到了ColumnBuilder内。UDF(暂不支持)如前面对SQLContext里Analyzer的分析,其FunctionRegistry没有实现lookupFunction。在spark-hive项目里,HiveContext里是实现了FunctionRegistry这个trait的,其实现为HiveFunctionRegistry,实现逻辑见org.apache.spark.sql.hive.hiveUdfsParquet支持待整理&Specific Docs and Codes:JSON支持SQLContext下,增加了jsonFile的读取方法,而且目前看,代码里实现的是hadoop textfile的读取,也就是这份json文件应该是在HDFS上的。具体这份json文件的载入,InputFormat是TextInputFormat,key class是LongWritable,value class是Text,最后得到的是value部分的那段String内容,即RDD[String]。&除了jsonFile,还支持jsonRDD,例子:&读取json文件之后,转换成SchemaRDD。JsonRDD.inferSchema(RDD[String])里有详细的解析json和映射出schema的过程,最后得到该json的LogicalPlan。&Json的解析使用的是FasterXML/jackson-databind库,,把数据映射成Map[String, Any]&Json的支持丰富了Spark SQL数据接入场景。JDBC支持is under goingSQL92Spark SQL目前的SQL语法支持情况见SqlParser类。目标是支持SQL92??&1. 基本应用上,sql server 和oracle都遵循。2. 实际应用中大家都会超出以上标准,使用各家数据库厂商都提供的丰富的自定义标准函数库和语法。3. 微软sql server的sql 扩展叫T-SQL(Transcate SQL).4. Oracle 的sql 扩展叫PL-SQL.存在问题大家可以跟进社区邮件列表,后续待整理。&总结以上整理了对Spark SQL各个模块的实现情况,代码结构,执行流程以及自己对Spark SQL的理解。理解有偏差的地方欢迎交流讨论 :)全文完 :)
TA的最新馆藏Apache Spark数据分析教程(二):Spark SQL
发表于 23:52|
来源Big Data Zone|
作者Rick Hightower and Fadi Maalouli
摘要:本Spark序列教程的第一部分,已对Spark进行介绍,详细解释了用于在Spark集群中进行数据分片存储的弹性分布式数据集(RDDs)以及Apache Spark的生态系统。本教程将给大家演示Spark及Spark SQL结合Cassandra的使用。
是一款非常流行同时功能又十分强大的实时数据分析工具。在本
,我们已经对Spark进行了介绍,讲解了Spark的历史,详细解释了用于在Spark集群中进行数据分片存储的弹性分布式数据集(
RDDs)并对Apache&Spark的生态系统进行了介绍。
本教程(第二部分)将对Spark生态系统中占有重要地位的Spark&SQL和DataFrame进行介绍,给大家演示Spark、Spark&SQL结合Cassandara的使用。如果你还没有学习过本序列教程的第一部分,请点击
进行学习。
的有效继任者并对其进行了有效补充,它引领了大数据技术的发展趋势。Spark为数据分析运行在大规模分布式系统任务上提供了易于使用的API,它能够比其它形式的数据分析运行得更快,这缘于其大多数的任务都能够在内存中完成。Apache&Spark为一个普通的开发人员提供了实时大数据分析能力,&Spark&SQL便是明证,Spark&SQL&API不仅易于使用而且功能强大。
Spark&SQL使得运行SQL和HiveQL查询十分简单(注意:
)。Spark&SQL&能够轻易地定位相应的表和元数据。Spark&SQL&为Spark提供了查询结构化数据的能力,查询时既可以使用SQL也可以使用人们熟知的DataFrame&API(RDD)。Spark&SQL支持多语言编程包括Java、Scala、Python及R,开发人员可以根据自身喜好进行选择。
使用Java&查询数据
String query = "SELECT * FROM table";
ResultSet results = session.execute(query);
是Spark&SQL的核心,它将数据保存为行构成的集合,行对应列有相应的列名。使用DataFrames可以非常方便地查询数据、给数据绘图及进行数据过滤。
DataFrames也可以用于数据的输入与输出,例如利用Spark&SQL中的DataFrames,可以轻易地将下列数据格式加载为表并进行相应的查询操作:
数据一旦被读取,借助于DataFrames便可以很方便地进行数据过滤、列查询、计数、求平均值及将不同数据源的数据进行整合。
如果你正计划通过读取和写数据来进行分析,Spark&SQL可以轻易地帮你实现并将整个过程自动化。
在后面的例子中,我们将在Python&Spark&shell中给大家演示如何使用Spark&SQL和DataFrames。从GitHub上获取提交的
QBit,&the&Java&Microservices&Lib历史数据,然后将其加载到Spark当中,并对数据进行相应的操作,具体步骤如下:
在终端上启动Python&Spark&shell:
cd spark-1.5.0-bin-hadoop2.4
./bin/pyspark
15/08/22 22:30:40 INFO BlockManagerMaster: Registered BlockManager
Welcome to
___ _____/ /__
_\ \/ _ \/ _ `/ __/
/__ / .__/\_,_/_/ /_/\_\
version 1.5.0
Using Python version 2.7.5 (default, Mar
SparkContext available as sc, HiveContext available as sqlContext.
从github上获取QBit的提交历史,并保存到名称为test.log的文件中:
抽取提交历史并保存为log文件
git log & test.log
由于此次使用的是Python,我们先通过textFile方法将test.log加载为RDD,然后在该RDD上执行一些操作:
textFile = sc.textFile("../qbit/test.log")
执行完上面这条语句,可以得到一个textFile&RDD,该RDD由文本行组成的分区数据构成,先来统计一个RDD中的文本行数:
textFile.count()
代码执行完,得到的行数为5776。然后我们先行中带有commit关键字的行筛选出来:
linesWithCommit = textFile.filter(lambda line: "commit" in line)
通过前面的操作足以说明通过Python&使用RDD&的简便性。
为后面演示DataFrame的使用,先让github的历史记录文件抽取保存为JSON类型并将文件命名为sparktest.json:
将github上的提交历史保存为JSON
--pretty=format:'{"commit":"%H","author":"%an","author_email":"%ae","date":"%ad","message":"%f"}' & sparktest.json
在正式进行Spark&SQL操作之前,先得创建sqlContext,它可以通过SparkContext进行创建:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
在shell命令行中,sqlContext&同SparkContext&一样都是自动创建的,无需自己手动去创建,SparkContext以SC变量名的形式存在,sqlContext则以sqlContext&变量名的形式存在。
接下来,将JSON数据加载为Spark的DataFrame,变量命名为dataframe:
将JSON数据加载成DataFrame&,变量命名为dataframe
dataframe = sqlContext.load("../qbit/sparktest.json", "json")
加载数据时,只需调用sqlContext&的load()方法,方法中传入的参数为文件目录和文件类型。Spark会为dataframe解析所有的列及对应名称,为确保所有的工作都已按预期执行,可以打印出dataframe的模式(Schema):
打印dataframe的模式(Schema)
dataframe.printSchema()
|-- author: string (nullable = true)
|-- author_email: string (nullable = true)
|-- commit: string (nullable = true)
|-- date: string (nullable = true)
|-- message: string (nullable = true)
上面这个带根(root)的图展示了各行对应的列名及其对应类型。本例中的每行表示的是Gihub上
项目对应的一次提交。所有的准备工作完成后,便可以在数据上进行相应的操作。
例如,我们可以获取文件的第一条提交记录,该提交记录表示的是github的最近一次提交。
获取最近的提交记录用以分析
dataframe.first()
Row(author=u'Richard Hightower', author_email=u'',
commit=u'696a94f80d1eedae9a340fab1a27',
date=u'Wed Aug 19 17:51:11 ',
message=u'Merge-pull-request-359-from-advantageous-add_better_uri_param_handling')
我们可以查询所有列中的某一列并显示其内容,例如,只查询
项目的作者(author)列并显示最近的20个源码贡献者,默认情况下Spark会返回最近的20条记录。
采用Spark&SQL进行分析—查询author列并返回最近的20条记录
dataframe.select("author").show()
+-----------------+
+-----------------+
|Richard Hightower|
Rick Hightower|
Rick Hightower|
|Richard Hightower|
Rick Hightower|
|Richard Hightower|
Rick Hightower|
|Geoffrey Chandler|
|Geoffrey Chandler|
|Richard Hightower|
|Richard Hightower|
|Richard Hightower|
|Richard Hightower|
|Richard Hightower|
|Richard Hightower|
Rick Hightower|
Rick Hightower|
Rick Hightower|
Rick Hightower|
Rick Hightower|
+-----------------+
当然,也可以设置show()函数的参数以返回需要的记录行数,这里只返回最近5个为
项目贡献过源码作者:
查询作者列并返回最近的5个贡献过源码的作者
dataframe.select("author").show(5)
+-----------------+
+-----------------+
|Richard Hightower|
Rick Hightower|
Rick Hightower|
|Richard Hightower|
Rick Hightower|
+-----------------+
我们可以再好好想想,这里使用的是一些相对非结构化的数据,在这个案例中,我们抓取项目的git提交日志后,可以马上执行相应的查询。现在我们想象一下,如果要在成千上万的项目上执行同样的操作,所有这些项目构成的可能是一个大公司git库,另外经常需要对所有的数据进行分析,而不只是对其中一个项目数据进行分析的话,便可以使用Spark集群处理大量的非结构化数据。此时你便可以看到Spark作为一个实时数据分析平台的处理能力,它具有简单易用、可扩展且处理能力强的特点。
查询Date列并显示最近的20条提交日期记录:
查询Date列并显示最近的20条提交日期记录
dataframe.select("date").show()
+--------------------+
+--------------------+
|Wed Aug 19 17:51:...|
|Wed Aug 19 17:37:...|
|Wed Aug 19 16:59:...|
|Wed Aug 19 14:47:...|
|Wed Aug 19 14:42:...|
|Wed Aug 19 13:05:...|
|Wed Aug 19 11:59:...|
|Mon Aug 17 10:18:...|
|Mon Aug 17 10:17:...|
|Mon Aug 17 00:46:...|
|Sun Aug 16 23:52:...|
|Sun Aug 16 23:33:...|
|Sun Aug 16 23:05:...|
|Sun Aug 16 23:03:...|
|Sun Aug 16 22:33:...|
|Thu Aug 13 21:20:...|
|Thu Aug 13 21:15:...|
|Thu Aug 13 20:31:...|
|Thu Aug 13 20:05:...|
|Thu Aug 13 20:04:...|
+--------------------+
通过dataframe获取
已提交次数,计算dataframe的行数:
获取QBit&Microservice&Lib已经提交次数
dataframe.count()
914便是提交次数,该提交次数也可以从Github上看到。
我们也使用DataFrame的&filter函数进行提交次数统计,例如可以统计有多少提交是由Richard&Hightower或Geoffrey&Chandler完成的。
筛选出Richard&Hightower&的提交并统计提交次数
dataframe.filter(dataframe.author =="Richard Hightower").count()
Richard&Hightower的提交次数是708。
筛选出Geoffrey&Chandler&的提交并统计提交次数
dataframe.filter(dataframe.author =="Geoffrey Chandler").count()
Geoffrey&Chandler的提交次数是102。
前面的例子是通过JSON格式的数据文件创建DataFrame,我们也可以通过另外两种方式创建DataFrame:
如果列及其类型在运行时之前都是未知的,可以通过创建模式并将其应用到RDD上来创建。
如列及其类型是已知的,可以通过反射机制来创建。
为简单起见,这里使用Spark自带的people.txt文件创建RDD,该文件中有三个人名及对应年龄,姓名与年龄使用逗号分隔,该文件可以使用通过下列文件路径找到:~/spark/examples/src/main/resources/people.txt。下面的编码步骤将使用详细的注释以便于理想。
People.txt&文件内容
Michael, 29
Justin, 19
创建模式(Schema)并将其应用到&textFile&RDD
# Import data types
from pyspark.sql.types import *
# Create a RDD from `people.txt`
# then convert each line to a tuple.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))
# encode the schema in a string.
schemaString = "name age"
# Create a type fields
fields = [StructField(field_name, StringType(), True) \
for field_name in schemaString.split()]
# Create the schema
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)
# In order to query data you need
# to register the DataFrame as a table.
schemaPeople.registerTempTable("people")
# Using sql query all the name from the table
results = sqlContext.sql("SELECT name FROM people")
# The results of SQL queries are RDDs
# and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():
print name
上面的代码输出下列内容:
Name: Michael
Name: Andy
Name: Justin
输出内容确实为所有人的名字。
可以看到,Spark能够非常方便地赋与非结构化数据相应的结构化信息以利于查询,Spark甚至能够将集群节点中的数据进行分割并进行并行分析。目前你可以视Apache&Spark为一个能够进行实时数据分析和即席查询分析的快速、通用的大规模数据处理引擎。
现在让我们来演示如何利用反射机制进行数据分析。
在Spark&SQL中通过反射机制进行数据分析
# First we need to import the following Row class
from pyspark.sql import SQLContext, Row
# Create a RDD peopleAge,
# when this is done the RDD will
# be partitioned into three partitions
peopleAge = sc.textFile("examples/src/main/resources/people.txt")
# Since name and age are separated by a comma let's split them
parts = peopleAge.map(lambda l: l.split(","))
# Every line in the file will represent a row
# with 2 columns name and age.
# After this line will have a table called people
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
# Using the RDD create a DataFrame
schemaPeople = sqlContext.createDataFrame(people)
# In order to do sql query on a dataframe,
# you need to register it as a table
schemaPeople.registerTempTable("people")
# Finally we are ready to use the DataFrame.
# Let's query the adults that are aged between 21 and 50
adults = sqlContext.sql("SELECT name FROM people \
WHERE age &= 21 AND age &= 50")
# loop through names and ages
adults = adults.map(lambda p: "Name: " + p.name)
for Adult in adults.collect():
print Adult
上面的代码将输出:
Name: Michael
Name: Andy
上面两人的年龄确实在21~50之间。
Spark、SparkSQL与&Cassandra协同使用
Spark&与Cassandra协同使用
假设想利用
并通过Java编写一个程序。这里给出使Apache&Spark与
能够协同使用的步骤:
首先需要导入下列依赖:
spark-cassandra-connector_2.10:1.1.1-rc4'
spark-cassandra-connector-java_2.10:1.1.1'
spark-streaming_2.10:1.5.0'
使用Gradle管理依赖:
Spark&SQL和Cassandra&协同使用进行数据分析时的Gradle构建文件
dependencies {
//Spark and Cassandra connector to work with java
compile 'com.datastax.spark:spark-cassandra-connector_2.10:1.1.1-rc4'
compile 'com.datastax.spark:spark-cassandra-connector-java_2.10:1.1.1'
compile 'org.apache.spark:spark-streaming_2.10:1.5.0'
然后,设置Spark配置文件,SparkConf用于对Spark的配置属性(如Spark&Master及应用程序名称)进行配置,也可以通过set()方法进行任意的键值对如spark.cassandra.connection.host进行配置。
Spark&master为需要连接的集群管理器,支持以几种URL:
local,将Spark运行在本地的一个woker线程上,本例使用的便是这种方式
local[K],将Spark运行在本地的K个线程上,通常K被设置为机器的CPU核数
spark://HOST:PORT&,连接给定的集群master,端口必须与master匹配,默认值为7077
为使Spark能够使用Cassandra,需要设置spark.cassandra.connection.host为Spark&master的主机地址,在本例中为本地主机地址,具体配置如下:
SparkConf conf = new SparkConf();
conf.setAppName("TODO spark and cassandra");
conf.setMaster("local");
conf.set("spark.cassandra.connection.host", "localhost");
完成前面的配置后,便可以着手创建模式(Schema),该模式为Cassandra的表和keyspace&,它可以保存后期需要加载的数据。
创建一个CassandraConnector&的连接器实例,同时创建Cassandra的Keyspacce&todolist和Table的todolist
private void createSchema(JavaSparkContext sc) {
CassandraConnector connector =
CassandraConnector.apply(sc.getConf());
try (Session session = connector.openSession()) {
session.execute(deletekeyspace);
session.execute(keyspace);
session.execute("USE todolist");
session.execute(table);
session.execute(tableRDD);
正如上面的代码描述的,我们创建了一个CassandraConnector&的实例,然后执行Cassandra查询语言(Cassandra&Query&Language,CQL)。我们将在后面的其它文章中对这一主题进行详细讲解。
使用Cassandra查询语言
/* Delete keyspace todolist if exists. */
String deletekeyspace = "DROP KEYSPACE IF EXISTS todolist";
/* Create keyspace todolist. */
String keyspace = "CREATE KEYSPACE IF NOT EXISTS todolist" +
" WITH replication = {'class': 'SimpleStrategy'," +
" 'replication_factor':1}";
/* Create table todolisttable. */
String table = "CREATE TABLE todolist.todolisttable(" +
+ " id text PRIMARY KEY, "
+ " description text, "
+ " category text, "
+ " date timestamp )";
/* Create table temp. */
String tableRDD = "CREATE TABLE todolist.temp(id text PRIMARY KEY, "
+ "description text, "
+ "category text )";
我们现在有两张表,它们分别是todolisttable&和temp,然后使用Cassandra&CQL将todo项的数据加载到todolisttable当中:
private void loadData(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
try (Session session = connector.openSession()) {
session.execute(task1);
session.execute(task2);
session.execute(task3);
session.execute(task4);
session.execute(task5);
session.execute(task6);
session.execute(task7);
下面给出的是需要加载到Cassandra&中的todo项,最后面跟的是CQL&命令。
需要加载到Spark中的&Todo项目,加载时使用Cassandra&CQL&命令
TodoItem item = new TodoItem("George", "Buy a new computer", "Shopping");
TodoItem item2 = new TodoItem("John", "Go to the gym", "Sport");
TodoItem item3 = new TodoItem("Ron", "Finish the homework", "Education");
TodoItem item4 = new TodoItem("Sam", "buy a car", "Shopping");
TodoItem item5 = new TodoItem("Janet", "buy groceries", "Shopping");
TodoItem item6 = new TodoItem("Andy", "go to the beach", "Fun");
TodoItem item7 = new TodoItem("Paul", "Prepare lunch", "Coking");
//index data
String task1 = "INSERT INTO todolisttable (ID, Description, Category, Date)"
+ item.toString();
String task2 = "INSERT INTO todolisttable (ID, Description, Category, Date)"
+ item2.toString();
String task3 = "INSERT INTO todolisttable (ID, Description, Category, Date)"
+ item3.toString();
String task4 = "INSERT INTO todolisttable (ID, Description, Category, Date)"
+ item4.toString();
String task5 = "INSERT INTO todolisttable (ID, Description, Category, Date)"
+ item5.toString();
String task6 = "INSERT INTO todolisttable (ID, Description, Category, Date)"
+ item6.toString();
String task7 = "INSERT INTO todolisttable (ID, Description, Category, Date)"
+ item7.toString();
接下来便可以从Cassandra的todolisttable中查询数据:
从Cassandra的todolisttable中查询数据
private void queryData(JavaSparkContext sc) {
CassandraConnector connector =
CassandraConnector.apply(sc.getConf());
try (Session session = connector.openSession()) {
ResultSet results = session.execute(query);
System.out.println("Query all results from cassandra:\n" + results.all());
将Cassandra的表作为Spark&RDD并从中获取数据:
将Cassandra的表作为Spark&RDD并从中获取数据
void accessTableWitRDD(JavaSparkContext sc){
JavaRDD&String& cassandraRDD = javaFunctions(sc).cassandraTable("todolist", "todolisttable")
.map(new Function&CassandraRow, String&() {
public String call(CassandraRow cassandraRow) throws Exception {
return cassandraRow.toString();
为将Cassandra的表作为RDD读取数据,我们使用cassandraTable("keyspace",&"table")方法。cassandraTable&方法要能够起作用,需要利用javaFunctions()方法将sparkcontext作为参数传入。
对该RDD来说,其数据类型是CassandraRow
打印该RDD:
打印Spark&RDD的数据
System.out.println("\nData as CassandraRows from a RDD: \n" + StringUtils.join(cassandraRDD.toArray(), "\n"));
我们也可以像读取Cassandra表中的数据一样简单地将RDD保存到Cassandra当中,首先创建一个类型为TodoItem的RDD并填充部分数据,然后将其保存为Cassandra的临时表:
创建一个包含todo&items&集合的RDD,然后将其保存到Cassandra
public void saveRDDToCass(JavaSparkContext sc) {
List&TodoItem& todos = Arrays.asList(
new TodoItem("George", "Buy a new computer", "Shopping"),
new TodoItem("John", "Go to the gym", "Sport"),
new TodoItem("Ron", "Finish the homework", "Education"),
new TodoItem("Sam", "buy a car", "Shopping"),
new TodoItem("Janet", "buy groceries", "Shopping"),
new TodoItem("Andy", "go to the beach", "Fun"),
new TodoItem("Paul", "Prepare lunch", "Coking")
JavaRDD&TodoItem& rdd = sc.parallelize(todos);
javaFunctions(rdd).writerBuilder("todolist", "temp", mapToRow(TodoItem.class)).saveToCassandra();
上面我们创建了TodoItem的List集合,然后使用parallelize&方法创建对应的Spark&RDD对象rdd,然后通过调用传入参数为rdd对象的writerBuilder&方法将RDD保存为一个keyspace&todolist和一个temp表。
为确保rdd已经保存到Cassandra的temp表中,我们从该表中查询数据:
从Cassandra&中查询temp表数据
String query1 = "SELECT * FROM todolist.temp";
ResultSet results1 = session.execute(query1);
System.out.println("\nQuery all results from temp" +
" table after saving a RDD into Cassandra:\n" +
results1.all());
最后,我们给出完整的代码列表,并在代码运行部分给出所有的命令,包括如何从Github中获取源码、如何在机器上运行。
Spark&SQL与Cassandra协同使用
Spark&SQL能够让你查询结构化的数据,包括RDD和任何存储在Cassandra中的数据,为使用Spark&SQL&我们需要做以几件事:
创建SQLContext&(SQLContext构造函数参数为SparkContext)。
加载parquet&格式数据&(parquet数据格式是一种列式数据存储格式,意味着数据表按列组织而非行组织)。
数据加载完成后便得到DataFrame。
额外的信息使得在数据注册成表之后可以使用SQL进行查询
SQL查询得到的是行对象
SQL查询是一款强大的工具
值得注意的是Spark&DataFrame具有普通Spark&RDD所拥有的函数,而且在其数据集还具备更多关于列名称和类型的元数据。
关于Spark&SQL有用的信息包括:
Spark&SQL可以将表缓存到内存当中
当使用SQL进行数据查询时,返回的结果是RDD
使用parquets格式读取数据:列存储格式能够过滤掉不需要的数据
RDD能够以parquet格式文件存储
JSON对象可以使用jsonRDD方法转换成DataFrame&
RDD可以并行执行,它是一种弹性分布式数据集,是构成Spark的主要组件,它是数据的一种表示方式。RDD的数据可以分片存储在集群上,正是这些分片数据使得task可以并行执行。RDD的分片越多,其并行执行度越高。
是一种列数据存储格式,它也可以被其它数据处理系统如Hive所支持。是Hadoop生态圈的一部分,它是一种跨语言、跨数据处理框架的列式数据格式。Spark&SQL能够读写Parquet文件,这些文件保存了数据的Schema信息。
让我们来演示如何通过java语言使用Spark&SQL进行前面&todo&item例子的开发。
首先,需要在gradle&文件中引入spark-sql&的依赖:
在Gradle&中使用Spark&SQL&依赖
dependencies {
compile 'org.apache.spark:spark-sql_2.10:1.5.0'
然后,创建Spark&configuration&对象并连接Cassandra:
为Cassandra&创建SparkSpark&configuration对象SparkConf conf = new SparkConf();
conf.setAppName("TODO sparkSQL and cassandra");
conf.setMaster("local");
conf.set("spark.cassandra.connection.host", "localhost");创建Spark&Context对象&(JavaSparkContext)。创建Spark&ContextJavaSparkContextsc=newJavaSparkContext(conf);创建SQLContext对象以便使用SQL连接Cassandra:创建Spark&SQL&ContextJavaSparkContext sc = new JavaSparkContext(conf);通过SQLContext便能注册RDD并利用Spark&SQL进行查询操作。然后,创建RDD对象(rdd)并加载数据(TodoItems):RDD&加载&TodoItems List&TodoItem& todos = Arrays.asList(
new TodoItem("George", "Buy a new computer", "Shopping"),
new TodoItem("John", "Go to the gym", "Sport"),
new TodoItem("Ron", "Finish the homework", "Education"),
new TodoItem("Sam", "buy a car", "Shopping"),
new TodoItem("Janet", "buy groceries", "Shopping"),
new TodoItem("Andy", "go to the beach", "Fun"),
new TodoItem("Paul", "Prepare lunch", "Cooking")
JavaRDD&TodoItem& rdd = sc.parallelize(todos);需要注意的是我们使用parallelize方法将所有的Todo数据加载到整个Spark集群。通过context.parallelize方法产生。然后,通过sqlContext创建DataFrame:通过sqlContext创建DataFrameDataFrame dataframe = sqlContext.createDataFrame(rdd, TodoItem.class);Dataframe从&TodoItem.class获取对应的schema。然后,将dataframe注册成名为todo的表:将DataFrame注册成名为todo的表sqlContext.registerDataFrameAsTable(dataframe, "todo");这样后面便可以使用todo进行DataFrame数据的查询。到这一步便可以使用Spark&SQL提供的所有操作,首先对todo&items进行计数,它将加载数据到内存以便进行更快速的查询操作:获取DataFrame中TODO&items的数量System.out.println("Total number of TodoItems = [" + rdd.count() + "]\n");最后,使用SQL进行数据的查询:使用Spark&SQL查询Todo&Items并显示查询结果 DataFrame result = sqlContext.sql("SELECT * from todo");
System.out.println("Show the DataFrame result:\n");
result.show();
System.out.println("Select the id column and show its contents:\n");
result.select("id").show();可以在本文最后的运行部分获取相关代码及如何运行这些代码的教程。这里给出本文例子的完整代码。完整代码清单SparkApp.java清单package com.
import com.datastax.driver.core.ResultS
import com.datastax.driver.core.S
import com.datastax.spark.connector.cql.CassandraC
import com.datastax.spark.connector.japi.CassandraR
import mons.lang.StringU
import org.apache.spark.SparkC
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkC
import org.apache.spark.api.java.function.F
import org.slf4j.L
import org.slf4j.LoggerF
import java.io.S
import java.util.A
import java.util.L
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaF
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToR
//import org.apache.cassandra.cql.BatchS
* Created by fadi on 5/18/15.
public class SparkApp implements Serializable {
static final Logger logger = LoggerFactory.getLogger(SparkApp.class);
TodoItem item = new TodoItem("George", "Buy a new computer", "Shopping");
TodoItem item2 = new TodoItem("John", "Go to the gym", "Sport");
TodoItem item3 = new TodoItem("Ron", "Finish the homework", "Education");
TodoItem item4 = new TodoItem("Sam", "buy a car", "Shopping");
TodoItem item5 = new TodoItem("Janet", "buy groceries", "Shopping");
TodoItem item6 = new TodoItem("Andy", "go to the beach", "Fun");
TodoItem item7 = new TodoItem("Paul", "Prepare lunch", "Coking");
String keyspace = "CREATE KEYSPACE IF NOT EXISTS todolist
WITH replication = {'class': 'SimpleStrategy', 'replication_factor':1}";
//index data
String task1 = "INSERT INTO todolisttable (ID, Description, Category, Date)"
+ item.toString();
String task2 = "INSERT INTO todolisttable (ID, Description, Category, Date)"
+ item2.toString();
String task3 = "INSERT INTO todolisttable (ID, Description, Category, Date)"
+ item3.toString();
String task4 = "INSERT INTO todolisttable (ID, Description, Category, Date)"
+ item4.toString();
String task5 = "INSERT INTO todolisttable (ID, Description, Category, Date)"
+ item5.toString();
String task6 = "INSERT INTO todolisttable (ID, Description, Category, Date)"
+ item6.toString();
String task7 = "INSERT INTO todolisttable (ID, Description, Category, Date)"
+ item7.toString();
//delete keyspace
String deletekeyspace = "DROP KEYSPACE IF EXISTS todolist";
//delete table
String deletetable = "DROP TABLE todolisttable";
//create table
String table = "CREATE TABLE todolist.todolisttable(id text PRIMARY KEY, "
+ "description text, "
+ "category text, "
+ "date timestamp )";
String tableRDD = "CREATE TABLE todolist.temp(id text PRIMARY KEY, "
+ "description text, "
+ "category text )";
//Query all data
String query = "SELECT * FROM todolist.todolisttable";
String query1 = "SELECT * FROM todolist.temp";
//Update table
String update = "UPDATE todolisttable SET Category='Fun',Description='Go to the beach' WHERE ID='Ron'";
//Deleting data where the index id = George
String delete = "DELETE FROM todolisttable WHERE ID='George'";
//Deleting all data
String deleteall = "TRUNCATE todolisttable";
//---------------------------------------------------------------------------------
private transient SparkC
private SparkApp(SparkConf conf) {
this.conf =
private void run() {
JavaSparkContext sc = new JavaSparkContext(conf);
createSchema(sc);
loadData(sc);
saveRDDToCassandra(sc);
queryData(sc);
accessTableWitRDD(sc);
sc.stop();
private void createSchema(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
try (Session session = connector.openSession()) {
session.execute(deletekeyspace);
session.execute(keyspace);
session.execute("USE todolist");
session.execute(table);
session.execute(tableRDD);
private void loadData(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
try (Session session = connector.openSession()) {
session.execute(task1);
session.execute(task2);
session.execute(task3);
session.execute(task4);
session.execute(task5);
session.execute(task6);
session.execute(task7);
private void queryData(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
try (Session session = connector.openSession()) {
ResultSet results = session.execute(query);
System.out.println("\nQuery all results from cassandra's todolisttable:\n" + results.all());
ResultSet results1 = session.execute(query1);
System.out.println("\nSaving RDD into a temp table in casssandra then query all results from cassandra:\n" + results1.all());
void accessTableWitRDD(JavaSparkContext sc){
JavaRDD&String& cassandraRDD = javaFunctions(sc).cassandraTable("todolist", "todolisttable")
.map(new Function&CassandraRow, String&() {
public String call(CassandraRow cassandraRow) throws Exception {
return cassandraRow.toString();
System.out.println("\nReading Data from todolisttable in Cassandra with a RDD: \n" + StringUtils.join(cassandraRDD.toArray(), "\n"));
// javaFunctions(cassandraRDD).writerBuilder("todolist", "todolisttable", mapToRow(String.class)).saveToCassandra();
public void saveRDDToCassandra(JavaSparkContext sc) {
List&TodoItem& todos = Arrays.asList(
new TodoItem("George", "Buy a new computer", "Shopping"),
new TodoItem("John", "Go to the gym", "Sport"),
new TodoItem("Ron", "Finish the homework", "Education"),
new TodoItem("Sam", "buy a car", "Shopping"),
new TodoItem("Janet", "buy groceries", "Shopping"),
new TodoItem("Andy", "go to the beach", "Fun"),
new TodoItem("Paul", "Prepare lunch", "Coking")
JavaRDD&TodoItem& rdd = sc.parallelize(todos);
javaFunctions(rdd).writerBuilder("todolist", "temp", mapToRow(TodoItem.class)).saveToCassandra();
//----------------------------------------------------------------------------------------------------------------------------
public static void main( String args[] )
SparkConf conf = new SparkConf();
conf.setAppName("TODO spark and cassandra");
conf.setMaster("local");
conf.set("spark.cassandra.connection.host", "localhost");
SparkApp app = new SparkApp(conf);
app.run();
}SparkSQLApp.java清单package com.
import org.apache.spark.SparkC
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkC
import org.apache.spark.sql.DataF
import org.apache.spark.sql.SQLC
import java.util.A
import java.util.L
* Created by fadi on 6/14/15.
public class SparkSQLApp {
private transient SparkC
private SparkSQLApp(SparkConf conf) {
this.conf =
private void run() {
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
createDataframe(sc, sqlContext);
querySQLData(sqlContext);
sc.stop();
public void createDataframe(JavaSparkContext sc, SQLContext sqlContext ) {
List&TodoItem& todos = Arrays.asList(
new TodoItem("George", "Buy a new computer", "Shopping"),
new TodoItem("John", "Go to the gym", "Sport"),
new TodoItem("Ron", "Finish the homework", "Education"),
new TodoItem("Sam", "buy a car", "Shopping"),
new TodoItem("Janet", "buy groceries", "Shopping"),
new TodoItem("Andy", "go to the beach", "Fun"),
new TodoItem("Paul", "Prepare lunch", "Cooking")
JavaRDD&TodoItem& rdd = sc.parallelize(todos);
DataFrame dataframe =
sqlContext.createDataFrame(rdd, TodoItem.class);
sqlContext.registerDataFrameAsTable(dataframe, "todo");
System.out.println("Total number of TodoItems = [" + rdd.count() + "]\n");
public void querySQLData(SQLContext sqlContext) {
DataFrame result = sqlContext.sql("SELECT * from todo");
System.out.println("Show the DataFrame result:\n");
result.show();
System.out.println("Select the id column and show its contents:\n");
result.select("id").show();
public static void main( String args[] )
SparkConf conf = new SparkConf();
conf.setAppName("TODO sparkSQL and cassandra");
conf.setMaster("local");
conf.set("spark.cassandra.connection.host", "localhost");
SparkSQLApp app = new SparkSQLApp(conf);
app.run();
}Todoitem.java清单package com.
import java.io.S
import java.time.LocalDateT
public class TodoItem implements Serializable {
private final LocalDateTime date = LocalDateTime.now();
public TodoItem(String id, String description, String category) {
this.description =
this.category =
public String getId(){
return this.
String getDescription(){
return this.
public String getCategory(){
return this.
public void setId(String id) {
public void setDescription(String description) {
this.description =
public void setCategory(String category) {
this.category =
public String toString() {
"VALUES ( " + "'" + this.id +"'" + ", " + "'" + this.description +"'" + ", " + "'" + this.category +"'" +", "
+ "'" + date +"'" + ")";
}build.gradle清单apply plugin: 'idea'
apply plugin: 'java'
apply plugin: 'jetty'
apply plugin: 'application'
applicationName = 'todocass'
applicationDefaultJvmArgs = ["-Dlogback.configurationFile=etc/todosolr/logging.xml"]
sourceCompatibility = 1.8
version = '1.0'
repositories {
mavenLocal()
mavenCentral()
task runSpark(type: JavaExec, dependsOn: 'classes') {
main = "com.example.SparkApp"
classpath = sourceSets.main.runtimeClasspath
task runSparkSQL(type: JavaExec, dependsOn: 'classes') {
main = "com.example.SparkSQLApp"
classpath = sourceSets.main.runtimeClasspath
dependencies {
//spark and cassandra connector to work with java
compile 'com.datastax.spark:spark-cassandra-connector_2.10:1.1.1-rc4'
compile 'com.datastax.spark:spark-cassandra-connector-java_2.10:1.1.1'
compile 'org.apache.spark:spark-streaming_2.10:1.5.0'
compile 'org.apache.spark:spark-sql_2.10:1.5.0'
//logback dependencies
compile 'ch.qos.logback:logback-core:1.1.3'
compile 'ch.qos.logback:logback-classic:1.1.3'
compile 'org.slf4j:slf4j-api:1.7.12'
//Install/copy tasks
task copyDist(type: Copy) {
dependsOn "installApp"
from "$buildDir/install/todocass"
into 'opt/todocass'
task copyLog(type: Copy) {
from "src/main/resources/logback.xml"
into "etc/todocass/"
task copyLogToImage(type: Copy) {
from "src/main/resources/logback.xml"
into "image-todo-cass/etc"
task copyDistToImage(type: Copy) {
dependsOn "installApp"
from "$buildDir/install/todocass"
into "$projectDir/image-todo-cass/opt/todocass"
}运行首先运行Cassandra:cd ~/cassandra
bin/cassandra -f获取代码:git clone 然后构建Spark-Course:cd Spark-Course
gradle clean build首先,运行SparkApp,这是Spark与Cassandra协同工作的例子:gradle runSpark运行时将看到下列内容:Query all results from cassandra's todolisttable:
[Row[George, Shopping, Mon Jun 15 13:36:07 PDT 2015, Buy a new computer], Row[Janet, Shopping, Mon Jun 15 13:36:07 PDT 2015, buy groceries], Row[John, Sport, Mon Jun 15 13:36:07 PDT 2015, Go to the gym], Row[Paul, Coking, Mon Jun 15 13:36:07 PDT 2015, Prepare lunch], Row[Ron, Education, Mon Jun 15 13:36:07 PDT 2015, Finish the homework], Row[Andy, Fun, Mon Jun 15 13:36:07 PDT 2015, go to the beach], Row[Sam, Shopping, Mon Jun 15 13:36:07 PDT 2015, buy a car]]
Saving RDD into a temp table in casssandra then query all results from cassandra:
[Row[George, Shopping, Buy a new computer], Row[Janet, Shopping, buy groceries], Row[John, Sport, Go to the gym], Row[Paul, Coking, Prepare lunch], Row[Ron, Education, Finish the homework], Row[Andy, Fun, go to the beach], Row[Sam, Shopping, buy a car]]
Reading Data from todolisttable in Cassandra with a RDD:
CassandraRow{id: Paul, category: Coking, date:
13:36:07-0700, description: Prepare lunch}
CassandraRow{id: Sam, category: Shopping, date:
13:36:07-0700, description: buy a car}
CassandraRow{id: Ron, category: Education, date:
13:36:07-0700, description: Finish the homework}
CassandraRow{id: Janet, category: Shopping, date:
13:36:07-0700, description: buy groceries}
CassandraRow{id: John, category: Sport, date:
13:36:07-0700, description: Go to the gym}
CassandraRow{id: George, category: Shopping, date:
13:36:07-0700, description: Buy a new computer}
CassandraRow{id: Andy, category: Fun, date:
13:36:07-0700, description: go to the beach}然后运行SparkSQL&APP,这是Spark&SQL与Cassandra协同工作的例子:gradle runSparkSQL:运行时将看到下列内容:Total number of TodoItems = [7]
Show the DataFrame result:
+---------+-------------------+------+
| category|
description|
+---------+-------------------+------+
| Shopping| Buy a new computer|George|
Go to the gym|
|Education|Finish the homework|
| Shopping|
buy a car|
| Shopping|
buy groceries| Janet|
go to the beach|
Prepare lunch|
+---------+-------------------+------+
Select the id column and show its contents:
+------+结束语本文展示了Spark在实时数据分析方面的强大功能,介绍了Spark生态系统中的一个重要部分——Spark&SQL和DataFrames。DataFrames构建在RDD之上,RDD的数据分片使得DataFrame能够被并行处理。在本文中,我们也演示了如何将Spark和Spark&SQL与Cassandra结合起来使用。Apache&Spark已经被证明是Hadoop的天然继承,并引领着大数据技术趋势。它在Hadoop生态圈中运行良好,是实现大数据分析的一条快速通道。Spark提供一套易于使用的API,具备大规模分布式任务下的数据分析能力。Apache&Spark&使得普通开发者也具备大数据的实时数据分析能力。&Spark&SQL是Apache&Spark提供的API的一个实例,易于使用而功能强大。参考文献&&&&&原文链接:(译者/牛亚真 审校/朱正贵 责编/仲浩)
译者简介:牛亚真,本科,2010年毕业于西南大学计算机与信息科学学院信息管理与信息系统专业;研究生,2013年毕业于中国科学院大学文献情报中心情报学专业,计算机信息处理与检索方向。
90+位讲师,16大分论坛,Databricks公司联合创始人、Apache Spark首席架构师辛湜,Hadoop、HBase和Thrift项目的PMC成员和Committer、Kudu的发明人Todd Lipcon等海外专家将亲临,票价折扣即将结束,。
推荐阅读相关主题:
CSDN官方微信
扫描二维码,向CSDN吐槽
微信号:CSDNnews
相关热门文章}

我要回帖

更多关于 switch case语句 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信