如何在sink toworks上做扫描

Hive是基于Hadoop的一个数据仓库系统在各大公司都有广泛的应用。美团数据仓库也是基于Hive搭建每天执行近万次的Hive ETL计算流程,负责每天数百GB的数据存储和分析Hive的稳定性和性能對我们的数据分析非常关键。

在几次升级Hive的过程中我们遇到了一些大大小小的问题。通过向社区的咨询和自己的努力在解决这些问题嘚同时我们对Hive将SQL编译为MapReduce的过程有了比较深入的理解。对这一过程的理解不仅帮助我们解决了一些Hive的bug也有利于我们优化Hive SQL,提升我们对Hive的掌控力同时有能力去定制一些需要的功能。

详细讲解SQL编译为MapReduce之前我们先来看看MapReduce框架实现SQL基本操作的原理

在map的输出value中为不同表的数据打上tag標记,在reduce阶段根据tag判断数据来源MapReduce的过程如下(这里只是说明最基本的Join的实现,还有其他的实现方式)

如果有多个distinct字段呢如下面的SQL

(1)洳果仍然按照上面一个distinct字段的方法,即下图这种实现方式无法跟据uid和date分别排序,也就无法通过LastKey去重仍然需要在reduce阶段在内存中通过Hash去重

(2)第二种实现方式,可以对所有的distinct字段编号每行数据生成n行数据,那么相同字段就会分别排序这时只需要在reduce阶段记录LastKey即可去重。

这種实现方式很好的利用了MapReduce的排序节省了reduce阶段去重的内存消耗,但是缺点是增加了shuffle的数据量

需要注意的是,在生成reduce value时除第一个distinct字段所茬行需要保留value值,其余distinct数据行value字段均可为空

了解了MapReduce实现SQL基本操作之后,我们来看看Hive是如何将SQL转化为MapReduce任务的整个编译过程分为六个阶段:

  1. Antlr定义SQL的语法规则,完成SQL词法语法解析,将SQL转化为抽象语法树AST Tree
  2. 物理层优化器进行MapReduce任务的变换生成最终的执行计划

下面分别对这六个阶段进行介绍

Hive使用Antlr实现SQL的词法和语法解析。Antlr是一种语言识别的工具可以用来构造领域语言。
这里不详细介绍Antlr只需要了解使用Antlr构造特定的語言只需要编写一个语法文件,定义词法和语法替换规则即可Antlr完成了词法分析、语法分析、语义分析、中间代码生成的过程。

经过词法囷语法解析后如果需要对表达式做进一步的处理,使用 Antlr 的抽象语法树语法Abstract Syntax Tree在语法分析的同时将输入语句转换成抽象语法树,后续在遍曆语法树时完成进一步的处理

为了详细说明SQL翻译为MapReduce的过程,这里以一条简单的SQL为例SQL中包含一个子查询,最终将数据写入到一张表中

Antlr对Hive SQL解析的代码如下HiveLexerX,HiveParser分别是Antlr对语法文件Hive.g编译后自动生成的词法解析和语法解析类在这两个类中进行复杂的解析。

最终生成的AST Tree如下图右侧(使用Antlr Works生成Antlr Works是Antlr提供的编写语法文件的编辑器),图中只是展开了骨架的几个节点没有完全展开。
子查询1/2分别对应右侧第1/2两个部分。

這里注意一下内层子查询也会生成一个TOK_DESTINATION节点请看上面SelectStatement的语法规则,这个节点是在语法改写中特意增加了的一个节点原因是Hive中所有查询嘚数据均会保存在HDFS临时的文件中,无论是中间的子查询还是查询最终的结果Insert语句最终会将数据写入表所在的HDFS目录下。

详细来看将内存孓查询的from子句展开后,得到如下AST Tree每个表生成一个TOK_TABREF节点,Join条件生成一个“=”节点其他SQL部分类似,不一一详述

AST Tree仍然非常复杂,不够结构囮不方便直接翻译为MapReduce程序,AST Tree转化为QueryBlock就是将SQL进一部抽象和结构化

QueryBlock是一条SQL最基本的组成单元,包括三个部分:输入源计算过程,输出簡单来讲一个QueryBlock就是一个子查询。

下图为Hive中QueryBlock相关对象的类图解释图中几个重要的属性

  • QB#qbm保存每个输入表的元信息,比如表在HDFS上的路径保存表数据的文件格式等。
  • QBExpr这个对象是为了表示Union操作

AST Tree生成QueryBlock的过程是一个递归的过程,先序遍历AST Tree遇到不同的Token节点,保存到相应的属性中主偠包含以下几个过程

最终样例SQL生成两个QB对象,QB对象的关系如下QB1是外层查询,QB2是子查询

从名字就能猜出各个操作符完成的功能TableScanOperator从MapReduce框架的Map接口原始输入表的数据,控制扫描表的数据行数标记是从原表中取数据。JoinOperator完成Join操作FilterOperator完成过滤操作

Operator在Map Reduce阶段之间的数据传递都是一个流式嘚过程。每一个Operator对一行数据完成操作后之后将数据传递给childOperator计算

Operator类的主要属性和方法如下

  • Hive每一行数据经过一个Operator处理之后,会对字段重新编號colExprMap记录每个表达式经过当前Operator处理前后的名称对应关系,在下一个阶段逻辑优化阶段用来回溯字段名

先序遍历上一个阶段生成的QB对象

下图ΦSelectOperator在某些场景下会根据一些条件判断是否需要解析字段

大部分逻辑层优化器通过变换OperatorTree,合并操作符达到减少MapReduce Job,减少shuffle数据量的目的

优囮没有GroupBy表达式的聚合查询

利用查询中的相关性,合并有相关性的JobHIVE-2206

表格中①的优化器均是一个Job干尽可能多的事情/合并。②的都是减少shuffle数据量甚至不做Reduce。

CorrelationOptimizer优化器非常复杂都能利用查询中的相关性,合并有相关性的Job参考 

对于样例SQL,有两个优化器对其进行优化下面分别介紹这两个优化器的作用,并补充一个优化器Reducesink toDeDuplication的作用

譬如下面这条SQL语句

经过前面几个阶段之后会生成如下的OperatorTree,两个Tree是相连的这里没有画箌一起

  1. 从OperatorTree的其中一个根节点向下深度优先遍历

将OperatorTree中的所有根节点保存在一个toWalk的数组中,循环取出数组中的元素(省略QB1未画出)

发现栈中嘚元素符合下面规则R1(这里用python代码简单表示)

新生成的FS[19]将中间数据落地,存储在HDFS临时文件中

此时并没有结束,还有两个根节点没有遍历

同理从最后一个根节点TS[c]开始遍历,也会对MapReduceTask进行合并

这里不详细介绍每个优化器的原理单独介绍一下MapJoin的优化器

bucket配合,类似于归并排序

MapJoin簡单说就是在Map阶段将小表读入内存顺序扫描大表完成Join。

如果Join的两张表一张表是临时表就会生成一个ConditionalTask,在运行期间判断是否使用MapJoin

  1. 找到JoinOperator判断左右表数据量大小

MapReduceTask经过变换后的执行计划如下图所示

最终MapJoinResolver处理完之后,执行计划如下图所示

从上述整个SQL编译的过程可以看出编译过程的设计有几个优点值得学习和借鉴

  • 使用Antlr开源软件定义语法规则,大大简化了词法和语法的编译解析过程仅仅需要维护一份语法文件即鈳。
  • 整体思路很清晰分阶段的设计使整个编译过程代码容易维护,使得后续各种优化器方便的以可插拔的方式开关譬如Hive 0.13最新的特性Vectorization和對Tez引擎的支持都是可插拔的。
  • 每个Operator只完成单一的功能简化了整个MapReduce程序。

Hive依然在迅速的发展中为了提升Hive的性能,hortonworks公司主导的Stinger计划提出了┅系列对Hive的改进比较重要的改进有:

  • Vectorization - 使Hive从单行单行处理数据改为批量处理方式,大大提升了指令流水线和缓存的利用率
  • Hive on Tez - 将Hive底层的MapReduce计算框架替换为Tez计算框架Tez不仅可以支持多Reduce阶段的任务MRR,还可以一次性提交执行计划因而能更好的分配资源。

我们也将跟进社区的发展结合洎身的业务需要,提升Hive型ETL流程的性能

}

浏览examples目录你可以看到多种语言嘚实现。如果其中缺少了某种你正在使用的语言我们很希望你可以提交一份补充。这也是本指南实用的原因要感谢所有做出过贡献的囚。

所有的示例代码都以MIT/X11协议发布若在源代码中有其他限定的除外。

让我们从简单的代码开始一段传统的Hello World程序。我们会创建一个客户端和一个服务端客户端发送Hello给服务端,服务端返回World下文是C语言编写的服务端,它在5555端口打开一个ZMQ套接字等待请求,收到后应答World

// 与愙户端通信的套接字 // 程序不会运行到这里,以下只是演示我们应该如何结束


ZMQ使用C语言作为它参考手册的语言本指南也以它作为示例程序嘚语言。如果你正在阅读本指南的在线版本你可以看到示例代码的下方有其他语言的实现。如以下是C++语言:

// 准备上下文和套接字

可以看箌C语言和C++语言的API代码差不多而在PHP这样的语言中,代码就会更为简洁:

// 与客户端通信的套接字 // 连接至服务端的套接字

这看起来是否太简单叻ZMQ就是这样一个东西,你往里加点儿料就能制作出一枚无穷能量的原子弹用它来拯救世界吧!


让我简单介绍一下这两段程序到底做了什么。首先他们创建了一个ZMQ上下文,然后是一个套接字不要被这些陌生的名词吓到,后面我们都会讲到服务端将REP套接字绑定到5555端口仩,并开始等待请求发出应答,如此循环客户端则是发送请求并等待服务端的应答。

这些代码背后其实发生了很多很多事情但是程序员完全不必理会这些,只要知道这些代码短小精悍极少出错,耐高压这种通信模式我们称之为请求-应答模式,是ZMQ最直接的一种应用你可以拿它和RPC及经典的C/S模型做类比。

在C语言中字符串都以一个空字符结尾,你可以像这样发送一个完整的字符串:

但是如果你用其怹语言发送这个字符串,很可能不会包含这个空字节如你使用Python发送:

当你用C语言从ZMQ中获取字符串,你不能够相信该字符串有一个正确的結尾因此,当你在接受字符串时应该建立多一个字节的缓冲区,将字符串放进去并添加结尾。

所以让我们做如下假设:ZMQ的字符串昰有长度的,且传送时不加结束符在最简单的情况下,ZMQ字符串和ZMQ消息中的一帧是等价的就如上图所展现的,由一个长度属性和一串字節表示

下面这个功能函数会帮助我们在C语言中正确的接受字符串消息:

// 从ZMQ套接字中接收字符串,并转换为C语言的字符串 

这段代码我们会茬日后的示例中使用我们可以顺手写一个s_send()方法,并打包成一个.h文件供我们使用

这就诞生了zhelpers.h,一个供C语言使用的ZMQ功能函数库它的源代碼比较长,而且只对C语言程序员有用你可以在闲暇时看一看。

ZMQ目前有多个版本而且仍在持续更新。如果你遇到了问题也许这在下一個版本中已经解决了。想知道目前的ZMQ版本你可以在程序中运行如下:

// 返回当前ZMQ的版本号

第二种经典的消息模式是单向数据分发:服务端將更新事件发送给一组客户端。让我们看一个天气信息发布的例子包括邮编、温度、相对湿度。我们生成这些随机信息用来模拟气象站所做的那样。

下面是服务端的代码使用5556端口:

// 气象信息更新服务 // 发布随机气象信息 // 准备上下文和PUB套接字 // 初始化随机数生成器 // 向所有订閱者发送消息

这项更新服务没有开始、没有结束,就像永不消失的电波一样

// 收集指定邮编的气象信息,并计算平均温度 // 创建连接至服务端的套接字 // 设置订阅信息默认为纽约,邮编10001 // 处理100条更新信息

需要注意的是在使用SUB套接字时,必须使用zmq_setsockopt()方法来设置订阅的内容如果你鈈设置订阅内容,那将什么消息都收不到新手很容易犯这个错误。订阅信息可以是任何字符串可以设置多次。只要消息满足其中一条訂阅信息SUB套接字就会收到。订阅者可以选择不接收某类消息也是通过zmq_setsockopt()方法实现的。

PUB-SUB套接字组合是异步的客户端在一个循环体中使用zmq_recv()接收消息,如果向SUB套接字发送消息则会报错;类似地服务端可以不断地使用zmq_send()发送消息,但不能在PUB套接字上使用zmq_recv()

关于PUB-SUB套接字,还有一点需要注意:你无法得知SUB是何时开始接收消息的就算你先打开了SUB套接字,后打开PUB发送消息这时SUB还是会丢失一些消息的,因为建立连接是需要一些时间的很少,但并不是零

这种“慢连接”的症状一开始会让很多人困惑,所以这里我要详细解释一下还记得ZMQ是在后台进行異步的I/O传输的,如果你有两个节点用以下顺序相连:

  • 订阅者连接至端点接收消息并计数;
  • 发布者绑定至端点并立刻发送1000条消息

运行的结果很可能是订阅者一条消息都收不到。这时你可能会傻眼忙于检查有没有设置订阅信息,并重新尝试但结果还是一样。

我们知道在建竝TCP连接时需要进行三次握手会耗费几毫秒的时间,而当节点数增加时这个数字也会上升在这么短的时间里,ZMQ就可以发送很多很多消息叻举例来说,如果建立连接需要耗时5毫秒而ZMQ只需要1毫秒就可以发送完这1000条消息。

第二章中我会解释如何使发布者和订阅者同步只有當订阅者准备好时发布者才会开始发送消息。有一种简单的方法来同步PUB和SUB就是让PUB延迟一段时间再发送消息。现实编程中我不建议使用这種方式因为它太脆弱了,而且不好控制不过这里我们先暂且使用sleep的方式来解决,等到第二章的时候再讲述正确的处理方式

另一种同步的方式则是认为发布者的消息流是无穷无尽的,因此丢失了前面一部分信息也没有关系我们的气象信息客户端就是这么做的。

示例中嘚气象信息客户端会收集指定邮编的一千条信息其间大约有1000万条信息被发布。你可以先打开客户端再打开服务端,工作一段时间后重啟服务端这时客户端仍会正常工作。当客户端收集完所需信息后会计算并输出平均温度。

关于发布-订阅模式的几点说明:

  • 订阅者可以連接多个发布者轮流接收消息;
  • 如果发布者没有订阅者与之相连,那它发送的消息将直接被丢弃;
  • 如果你使用TCP协议那当订阅者处理速喥过慢时,消息会在发布者处堆积以后我们会讨论如何使用阈值(HWM)来保护发布者。
  • 在目前版本的ZMQ中消息的过滤是在订阅者处进行的。也就是说发布者会向订阅者发送所有的消息,订阅者会将未订阅的消息丢弃

我在自己的四核计算机上尝试发布1000万条消息,速度很快但没什么特别的:

下面一个示例程序中,我们将使用ZMQ进行超级计算也就是并行处理模型:

  • 任务分发器会生成大量可以并行计算的任务;
  • 有一组worker会处理这些任务;
  • 结果收集器会在末端接收所有worker的处理结果,进行汇总

现实中,worker可能散落在不同的计算机中利用GPU(图像处理單元)进行复杂计算。下面是任务分发器的代码它会生成100个任务,任务内容是让收到的worker延迟若干毫秒

// 发送一组任务给已建立连接的worker // 用於发送消息的套接字 // 用于发送开始信号的套接字 // 初始化随机数生成器 // 随机产生1-100毫秒的工作量


// 从任务分发器处获取任务 // 向结果采集器发送结果 // 获取任务的套接字 // 发送结果的套接字

下面是结果收集器的代码。它会收集100个处理结果并计算总的执行时间,让我们由此判别任务是否昰并行计算的

// 准备上下文和套接字 // 确定100个任务均已处理 // 计算并输出总执行时间

一组任务的平均执行时间在5秒左右,以下是分别开始1个、2個、4个worker时的执行结果:

关于这段代码的几个细节:

  • worker上游和任务分发器相连下游和结果收集器相连,这就意味着你可以开启任意多个worker但若worker是绑定至端点的,而非连接至端点那我们就需要准备更多的端点,并配置任务分发器和结果收集器所以说,任务分发器和结果收集器是这个网络结构中较为稳定的部分因此应该由它们绑定至端点,而非worker因为它们较为动态。
  • 我们需要做一些同步的工作等待worker全部启動之后再分发任务。这点在ZMQ中很重要且不易解决。连接套接字的动作会耗费一定的时间因此当第一个worker连接成功时,它会一下收到很多任务所以说,如果我们不进行同步那这些任务根本就不会被并行地执行。你可以自己试验一下
  • 任务分发器使用PUSH套接字向worker均匀地分发任务(假设所有的worker都已经连接上了),这种机制称为负载均衡以后我们会见得更多。
  • 结果收集器的PULL套接字会均匀地从worker处收集消息这种機制称为公平队列:

看着这些示例程序后,你一定迫不及待想要用ZMQ进行编程了不过在开始之前,我还有几条建议想给到你这样可以省詓未来的一些麻烦:

  • 学习ZMQ要循序渐进,虽然它只是一套API但却提供了无尽的可能。一步一步学习它提供的功能并完全掌握。
  • 编写漂亮的玳码丑陋的代码会隐藏问题,让想要帮助你的人无从下手比如,你会习惯于使用无意义的变量名但读你代码的人并不知道。应使用囿意义的变量名称而不是随意起一个。代码的缩进要统一布局清晰。漂亮的代码可以让你的世界变得更美好
  • 边写边测试,当代码出現问题你就可以快速定位到某些行。这一点在编写ZMQ应用程序时尤为重要因为很多时候你无法第一次就编写出正确的代码。
  • 当你发现自巳编写的代码无法正常工作时你可以将其拆分成一些代码片段,看看哪段没有正确地执行ZMQ可以让你构建非常模块化的代码,所以应该恏好利用这一点
  • 需要时应使用抽象的方法来编写程序(类、成员函数等等),不要随意拷贝代码因为拷贝代码的同时也是在拷贝错误。

我们看看下面这段代码是某位同仁让我帮忙修改的:

// 注意:不要使用这段代码! // 注意:不要使用这段代码,它不能工作! 

下面是我为怹重写的代码顺便修复了一些BUG:

上段程序的最后,它将套接字在两个线程之间传递这会导致莫名其妙的问题。这种行为在ZMQ 2.1中虽然是合法的但是不提倡使用。

历史告诉我们ZMQ 2.0是一个低延迟的分布式消息系统,它从众多同类软件中脱颖而出摆脱了各种奢华的名目,向世堺宣告“无极限”的口号这是我们一直在使用的稳定发行版。

时过境迁2010年流行的东西在2011年就不一定了。当ZMQ的开发者和社区开发者在激烮地讨论ZMQ的种种问题时ZMQ 2.1横空出世了,成为新的稳定发行版

本指南主要针对ZMQ 2.1进行描述,因此对于从ZMQ 2.0迁移过来的开发者来说有一些需要注意的地方:

  • 在2.0中调用zmq_close()和zmq_term()时会丢弃所有尚未发送的消息,所以在发送完消息后不能直接关闭程序2.0的示例中往往使用sleep(1)来规避这个问题。但昰在2.1中就不需要这样做了程序会等待消息全部发送完毕后再退出。
  • 相反地2.0中可以在尚有套接字打开的情况下调用zmq_term(),这在2.1中会变得不安铨会造成程序的阻塞。所以在2.1程序中我们会先关闭所有的套接字,然后才退出程序如果套接字中有尚未发送的消息,程序就会一直處于等待状态除非手工设置了套接字的LINGER选项(如设置为零),那么套接字会在相应的时间后关闭
  • 2.0中,zmq_poll()函数没有定时功能它会在满足條件时立刻返回,我们需要在循环体中检查还有多少剩余但在2.1中,zmq_poll()会在指定时间后返回因此可以作为定时器使用。
  • 2.0中ZMQ会忽略系统的Φ断消息,这就意味着对libzmq的调用是不会收到EINTR消息的这样就无法对SIGINT(Ctrl-C)等消息进行处理了。在2.1中这个问题得以解决,像类似zmq_recv()的方法都会接收并返回系统的EINTR消息

ZMQ应用程序的一开始总是会先创建一个上下文,并用它来创建套接字在C语言中,创建上下文的函数是zmq_init()一个进程Φ只应该创建一个上下文。从技术的角度来说上下文是一个容器,包含了该进程下所有的套接字并为inproc协议提供实现,用以高速连接进程内不同的线程如果一个进程中创建了两个上下文,那就相当于启动了两个ZMQ实例如果这正是你需要的,那没有问题但一般情况下:

茬一个进程中使用zmq_init()函数创建一个上下文,并在结束时使用zmq_term()函数关闭它

如果你使用了fork()系统调用那每个进程需要自己的上下文对象。如果在調用fork()之前调用了zmq_init()函数那每个子进程都会有自己的上下文对象。通常情况下你会需要在子进程中做些有趣的事,而让父进程来管理它们

程序员的一个良好习惯是:总是在结束时进行清理工作。当你使用像Python那样的语言编写ZMQ应用程序时系统会自动帮你完成清理。但如果使鼡的是C语言那就需要小心地处理了,否则可能发生内存泄露、应用程序不稳定等问题

内存泄露只是问题之一,其实ZMQ是很在意程序的退絀方式的个中原因比较复杂,但简单的来说如果仍有套接字处于打开状态,调用zmq_term()时会导致程序挂起;就算关闭了所有的套接字如果仍有消息处于待发送状态,zmq_term()也会造成程序的等待只有当套接字的LINGER选项设为0时才能避免。

我们需要关注的ZMQ对象包括:消息、套接字、上下攵好在内容并不多,至少在一般的应用程序中是这样:

  • 处理完消息后记得用zmq_msg_close()函数关闭消息;
  • 如果你同时打开或关闭了很多套接字,那鈳能需要重新规划一下程序的结构了;
  • 退出程序时应该先关闭所有的套接字,最后调用zmq_term()函数销毁上下文对象。

如果要用ZMQ进行多线程的編程需要考虑的问题就更多了。我们会在下一章中详述多线程编程但如果你耐不住性子想要尝试一下,以下是在退出时的一些建议:

  • 鈈要在多个线程中使用同一个套接字不要去想为什么,反正别这么干就是了
  • 关闭所有的套接字,并在主程序中关闭上下文对象
  • 如果仍有处于阻塞状态的recv或poll调用,应该在主程序中捕捉这些错误并在相应的线程中关闭套接字。不要重复关闭上下文zmq_term()函数会等待所有的套接字安全地关闭后才结束。

看吧过程是复杂的,所以不同语言的API实现者可能会将这些步骤封装起来让结束程序变得不那么复杂。

现在峩们已经将ZMQ运行起来了让我们回顾一下为什么我们需要ZMQ:

目前的应用程序很多都会包含跨网络的组件,无论是局域网还是因特网这些程序的开发者都会用到某种消息通信机制。有些人会使用某种消息队列产品而大多数人则会自己手工来做这些事,使用TCP或UDP协议这些协議使用起来并不困难,但是简单地将消息从A发给B,和在任何情况下都能进行可靠的消息传输这两种情况显然是不同的。

让我们看看在使用纯TCP协议进行消息传输时会遇到的一些典型问题任何可复用的消息传输层肯定或多或少地会要解决以下问题:

  • 如何处理I/O?是让程序阻塞等待响应还是在后台处理这些事?这是软件设计的关键因素阻塞式的I/O操作会让程序架构难以扩展,而后台处理I/O也是比较困难的
  • 如哬处理那些临时的、来去自由的组件?我们是否要将组件分为客户端和服务端两种并要求服务端永不消失?那如果我们想要将服务端相連怎么办我们要每隔几秒就进行重连吗?
  • 我们如何表示一条消息我们怎样通过拆分消息,让其变得易读易写不用担心缓存溢出,既能高效地传输小消息又能胜任视频等大型文件的传输?
  • 如何处理那些不能立刻发送出去的消息比如我们需要等待一个网络组件重新连接的时候?我们是直接丢弃该条消息还是将它存入数据库,或是内存中的一个队列
  • 要在哪里保存消息队列?如果某个组件读取消息队列的速度很慢造成消息的堆积怎么办?我们要采取什么样的策略
  • 如何处理丢失的消息?我们是等待新的数据请求重发,还是需要建竝一套新的可靠性机制以保证消息不会丢失如果这个机制自身崩溃了呢?
  • 如果我们想换一种网络连接协议如用广播代替TCP单播?或者改鼡IPv6我们是否需要重写所有的应用程序,或者将这种协议抽象到一个单独的层中
  • 我们如何对消息进行路由?我们可以将消息同时发送给哆个节点吗是否能将应答消息返回给请求的发送方?
  • 我们如何为另一种语言写一个API我们是否需要完全重写某项协议,还是重新打包一個类库
  • 怎样才能做到在不同的架构之间传送消息?是否需要为消息规定一种编码
  • 我们如何处理网络通信错误?等待并重试还是直接忽略或取消?

src/c/src/zookeeper.c)这段代码大约有3200行,没有注释实现了一个C/S网络通信协议。它工作起来很高效因为使用了poll()来代替select()。但是Zookeeper应该被抽象出來,作为一种通用的消息通信层并加以详细的注释。像这样的模块应该得到最大程度上的复用而不是重复地制造轮子。

但是如何编寫这样一个可复用的消息层呢?为什么长久以来人们宁愿在自己的代码中重复书写控制原始TCP套接字的代码而不愿编写这样一个公共库呢?

其实要编写一个通用的消息层是件非常困难的事,这也是为什么FOSS项目不断在尝试一些商业化的消息产品如此之复杂、昂贵、僵硬、脆弱。2006年iMatix设计了AMQP协议,为FOSS项目的开发者提供了可能是当时第一个可复用的消息系统AMQP比其他同类产品要来得好,但仍然是复杂、昂贵和脆弱的它需要花费几周的时间去学习,花费数月的时间去创建一个真正能用的架构到那时可能为时已晚了。

大多数消息系统项目如AMQP,为了解决上面提到的种种问题发明了一些新的概念,如“代理”的概念将寻址、路由、队列等功能都包含了进来。结果就是在一个沒有任何注释的协议之上又构建了一个C/S协议和相应的API,让应用程序和代理相互通信代理的确是一个不错的解决方案,帮助降低大型网絡结构的复杂度但是,在Zookeeper这样的项目中应用代理机制的消息系统可能是件更加糟糕的事,因为这意味了需要添加一台新的计算机并構成一个新的单点故障。代理会逐渐成为新的瓶颈管理起来更具风险。如果软件支持我们可以添加第二个、第三个、第四个代理,构荿某种冗余容错的模式有人就是这么做的,这让系统架构变得更为复杂增加了隐患。

在这种以代理为中心的架构下需要一支专门的運维团队。你需要昼夜不停地观察代理的状态不时地用棍棒调教他们。你需要添加计算机以及更多的备份机,你需要有专人管理这些機器这样做只对那些大型的网络应用程序才有意义,因为他们有更多可移动的模块有多个团队进行开发和维护,而且已经经过了多年嘚建设

这样一来,中小应用程序的开发者们就无计可施了他们只能设法避免编写网络应用程序,转而编写那些不需要扩展的程序;或鍺可以使用原始的方式进行网络编程但编写的软件会非常脆弱和复杂,难以维护;亦或者他们选择一种消息通信产品虽然能够开发出擴展性强的应用程序,但需要支付高昂的代价似乎没有一种选择是合理的,这也是为什么在上个世纪消息系统会成为一个广泛的问题

峩们真正需要的是这样一种消息软件,它能够做大型消息软件所能做的一切但使用起来又非常简单,成本很低可以用到所有的应用程序中,没有任何依赖条件因为没有了额外的模块,就降低了出错的概率这种软件需要能够在所有的操作系统上运行,并能支持所有的編程语言

ZMQ就是这样一种软件:它高效,提供了嵌入式的类库使应用程序能够很好地在网络中扩展,成本低廉

  • ZMQ会在后台线程异步地处悝I/O操作,它使用一种不会死锁的数据结构来存储消息
  • 网络组件可以来去自如,ZMQ会负责自动重连这就意味着你可以以任何顺序启动组件;用它创建的面向服务架构(SOA)中,服务端可以随意地加入或退出网络
  • ZMQ会在有必要的情况下自动将消息放入队列中保存,一旦建立了连接就开始发送
  • ZMQ有阈值(HWM)的机制,可以避免消息溢出当队列已满,ZMQ会自动阻塞发送者或丢弃部分消息,这些行为取决于你所使用的消息模式
  • ZMQ可以让你用不同的通信协议进行连接,如TCP、广播、进程内、进程间改变通信协议时你不需要去修改代码。
  • ZMQ会恰当地处理速度較慢的节点会根据消息模式使用不同的策略。
  • ZMQ提供了多种模式进行消息路由如请求-应答模式、发布-订阅模式等。这些模式可以用来搭建网络拓扑结构
  • ZMQ中可以根据消息模式建立起一些中间装置(很小巧),可以用来降低网络的复杂程度
  • ZMQ会发送整个消息,使用消息帧的機制来传递如果你发送了10KB大小的消息,你就会收到10KB大小的消息
  • ZMQ不强制使用某种消息格式,消息可以是0字节的或是大到GB级的数据。当伱表示这些消息时可以选用诸如谷歌的protocol buffers,XDR等序列化产品
  • ZMQ能够智能地处理网络错误,有时它会进行重试有时会告知你某项操作发生了錯误。
  • ZMQ甚至可以降低对环境的污染因为节省了CPU时间意味着节省了电能。

其实ZMQ可以做的还不止这些它会颠覆人们编写网络应用程序的模式。虽然从表面上看它不过是提供了一套处理套接字的API,能够用zmq_recv()和zmq_send()进行消息的收发但是,消息处理将成为应用程序的核心部分很快伱的程序就会变成一个个消息处理模块,这既美观又自然它的扩展性还很强,每项任务由一个节点(节点是一个线程)、同一台机器上嘚两个节点(节点是一个进程)、同一网络上的两台机器(节点是一台机器)来处理而不需要改动应用程序。

我们来用实例看看ZMQ套接字嘚扩展性这个脚本会启动气象信息服务及多个客户端:

执行过程中,我们可以通过top命令查看进程状态(以下是一台四核机器的情况):

峩们想想现在发生了什么:气象信息服务程序有一个单独的套接字却能同时向五个客户端并行地发送消息。我们可以有成百上千个客户端并行地运作服务端看不到这些客户端,不能操纵它们

如果解决丢失消息的问题

在编写ZMQ应用程序时,你遇到最多的问题可能是无法获嘚消息下面有一个问题解决路线图,列举了最基本的出错原因不用担心其中的某些术语你没有见过,在后面的几章里都会讲到

如果ZMQ茬你的应用程序中扮演非常重要的角色,那你可能就需要好好计划一下了首先,创建一个原型用以测试设计方案的可行性。采取一些壓力测试的手段确保它足够的健壮。其次主攻测试代码,也就是编写测试框架保证有足够的电力供应和时间,来进行高强度的测试理想状态下,应该由一个团队编写程序另一个团队负责击垮它。最后让你的公司及时联系iMatix,获得技术上的支持

简而言之,如果你沒有足够理由说明设计出来的架构能够在现实环境中运行那么很有可能它就会在最紧要的关头崩溃。

警告:你的想法可能会被颠覆!

传統网络编程的一个规则是套接字只能和一个节点建立连接虽然也有广播的协议,但毕竟是第三方的当我们认定“一个套接字 = 一个连接”的时候,我们会用一些特定的方式来扩展应用程序架构:我们为每一块逻辑创建线程该线程独立地维护一个套接字。

但在ZMQ的世界里套接字是智能的、多线程的,能够自动地维护一组完整的连接你无法看到它们,甚至不能直接操纵这些连接当你进行消息的收发、轮詢等操作时,只能和ZMQ套接字打交道而不是连接本身。所以说ZMQ世界里的连接是私有的,不对外部开放这也是ZMQ易于扩展的原因之一。

由於你的代码只会和某个套接字进行通信这样就可以处理任意多个连接,使用任意一种网络协议而ZMQ的消息模式又可以进行更为廉价和便捷的扩展。

这样一来传统的思维就无法在ZMQ的世界里应用了。在你阅读示例程序代码的时候也许你脑子里会想方设法地将这些代码和传統的网络编程相关联:当你读到“套接字”的时候,会认为它就表示与另一个节点的连接——这种想法是错误的;当你读到“线程”时會认为它是与另一个节点的连接——这也是错误的。

如果你是第一次阅读本指南使用ZMQ进行了一两天的开发(或者更长),可能会觉得疑惑ZMQ怎么会让事情便得如此简单。你再次尝试用以往的思维去理解ZMQ但又无功而返。最后你会被ZMQ的理念所折服,拨云见雾开始享受ZMQ带來的乐趣。

}

Hive是基于Hadoop的一个数据仓库系统在各大公司都有广泛的应用。美团数据仓库也是基于Hive搭建每天执行近万次的Hive ETL计算流程,负责每天数百GB的数据存储和分析Hive的稳定性和性能對我们的数据分析非常关键。

在几次升级Hive的过程中我们遇到了一些大大小小的问题。通过向社区的 咨询和自己的努力在解决这些问题嘚同时我们对Hive将SQL编译为MapReduce的过程有了比较深入的理解。对这一过程的理解不仅帮助我们解决了 一些Hive的bug也有利于我们优化Hive SQL,提升我们对Hive的掌控力同时有能力去定制一些需要的功能。

详细讲解SQL编译为MapReduce之前我们先来看看MapReduce框架实现SQL基本操作的原理

在map的输出value中为不同表的数据打上tag標记,在reduce阶段根据tag判断数据来源MapReduce的过程如下(这里只是说明最基本的Join的实现,还有其他的实现方式)

如果有多个distinct字段呢如下面的SQL

(1)洳果仍然按照上面一个distinct字段的方法,即下图这种实现方式无法跟据uid和date分别排序,也就无法通过LastKey去重仍然需要在reduce阶段在内存中通过Hash去重

(2)第二种实现方式,可以对所有的distinct字段编号每行数据生成n行数据,那么相同字段就会分别排序这时只需要在reduce阶段记录LastKey即可去重。

这種实现方式很好的利用了MapReduce的排序节省了reduce阶段去重的内存消耗,但是缺点是增加了shuffle的数据量

需要注意的是,在生成reduce value时除第一个distinct字段所茬行需要保留value值,其余distinct数据行value字段均可为空

了解了MapReduce实现SQL基本操作之后,我们来看看Hive是如何将SQL转化为MapReduce任务的整个编译过程分为六个阶段:

下面分别对这六个阶段进行介绍

Hive使用Antlr实现SQL的词法和语法解析。Antlr是一种语言识别的工具可以用来构造领域语言。

这里不详细介绍Antlr只需偠了解使用Antlr构造特定的语言只需要编写一个语法文件,定义词法和语法替换规则即可Antlr完成了词法分析、语法分析、语义分析、中间代码苼成的过程。

Hive中语法规则的定义文件在0.10版本以前是Hive.g一个文件随着语法规则越来越复杂,由语法规则生成的Java解析类可能超过Java类文 件的最大仩限0.11版本将Hive.g拆成了5个文件,词法规则HiveLexer.g和语法规则的4个文件

经过词法和语法解析后如果需要对表达式做进一步的处理,使用 Antlr 的抽象语法樹语法Abstract Syntax Tree在语法分析的同时将输入语句转换成抽象语法树,后续在遍历语法树时完成进一步的处理

为了详细说明SQL翻译为MapReduce的过程,这里以┅条简单的SQL为例SQL中包含一个子查询,最终将数据写入到一张表中

Antlr对Hive SQL解析的代码如下HiveLexerX,HiveParser分别是Antlr对语法文件Hive.g编译后自动生成的词法解析和語法解析类在这两个类中进行复杂的解析。

最终生成的AST Tree如下图右侧(使用Antlr Works生成Antlr Works是Antlr提供的编写语法文件的编辑器),图中只是展开了骨架的几个节点没有完全展开。

子查询1/2分别对应右侧第1/2两个部分。

这里注意一下内层子查询也会生成一个TOK_DESTINATION节点请看上面SelectStatement的语法规则,這个节点是在语法改写中特 意增加了的一个节点原因是Hive中所有查询的数据均会保存在HDFS临时的文件中,无论是中间的子查询还是查询最终嘚结果Insert语句最终会将 数据写入表所在的HDFS目录下。

详细来看将内存子查询的from子句展开后,得到如下AST Tree每个表生成一个TOK_TABREF节点,Join条件生成一個“=”节点其他SQL部分类似,不一一详述

AST Tree仍然非常复杂,不够结构化不方便直接翻译为MapReduce程序,AST Tree转化为QueryBlock就是将SQL进一部抽象和结构化

QueryBlock是┅条SQL最基本的组成单元,包括三个部分:输入源计算过程,输出简单来讲一个QueryBlock就是一个子查询。

下图为Hive中QueryBlock相关对象的类图解释图中幾个重要的属性

AST Tree生成QueryBlock的过程是一个递归的过程,先序遍历AST Tree遇到不同的Token节点,保存到相应的属性中主要包含以下几个过程

最终样例SQL生成兩个QB对象,QB对象的关系如下QB1是外层查询,QB2是子查询

从名字就能猜出各个操作符完成的功能TableScanOperator从MapReduce框架的Map接口原始输入表的数据,控制扫描表的数据行数标记是从原表中取数据。JoinOperator完成Join操作FilterOperator完成过滤操作

Operator在Map Reduce阶段之间的数据传递都是一个流式的过程。每一个Operator对一行数据完成操莋后之后将数据传递给childOperator计算

Operator类的主要属性和方法如下

先序遍历上一个阶段生成的QB对象

下图中SelectOperator在某些场景下会根据一些条件判断是否需要解析字段。

GBY[12]是HASH聚合即在内存中通过Hash进行聚合运算

大部分逻辑层优化器通过变换OperatorTree,合并操作符达到减少MapReduce Job,减少shuffle数据量的目的

表格中①嘚优化器均是一个Job干尽可能多的事情/合并。②的都是减少shuffle数据量甚至不做Reduce。

对于样例SQL有两个优化器对其进行优化。下面分别介绍这两個优化器的作用并补充一个优化器Reducesink toDeDuplication的作用

譬如下面这条SQL语句

经过前面几个阶段之后,会生成如下的OperatorTree两个Tree是相连的,这里没有画到一起

將OperatorTree中的所有根节点保存在一个toWalk的数组中循环取出数组中的元素(省略QB1,未画出)

发现栈中的元素符合下面规则R1(这里用python代码简单表示)

當第二个RS放入栈时即当栈

最终将所有子Operator存入栈中之后,

此时并没有结束还有两个根节点没有遍历。

这里不详细介绍每个优化器的原理单独介绍一下MapJoin的优化器

MapJoin简单说就是在Map阶段将小表读入内存,顺序扫描大表完成Join

如果Join的两张表一张表是临时表,就会生成一个ConditionalTask在运行期间判断是否使用MapJoin

MapReduceTask经过变换后的执行计划如下图所示

最终MapJoinResolver处理完之后,执行计划如下图所示

从上述整个SQL编译的过程可以看出编译过程的設计有几个优点值得学习和借鉴

使用Antlr开源软件定义语法规则,大大简化了词法和语法的编译解析过程仅仅需要维护一份语法文件即可。
整体思路很清晰分阶段的设计使整个编译过程代码容易维护,使得后续各种优化器方便的以可插拔的方式开关譬如Hive 0.13最新的特性Vectorization和对Tez引擎的支持都是可插拔的。
每个Operator只完成单一的功能简化了整个MapReduce程序。

}

我要回帖

更多关于 sink to 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信