Hadoop原比较器的工作原理问题,怎么解决

hadoop的原生比较器RawComparator T public WritableCom&&&
hadoop为序列化提供了优化,类型的比较对M/R而言至关重要,Key和Key的比较也是在排序阶段完成的,hadoop提供了原生的比较器接口RawComparator&T&用于序列化字节间的比较,该接口允许其实现直接比较数据流中的记录,无需反序列化为对象,RawComparator是一个原生的优化接口类,它只是简单的提供了用于数据流中简单的数据对比方法,从而提供优化:
public interface RawComparator&T& extends Comparator&T& {
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
该类并非被多数的衍生类所实现,其具体的子类为WritableComparator,多数情况下是作为实现Writable接口的类的内置类,提供序列化字节的比较。下面是RawComparator接口内置类的实现类图:
首先,我们看 RawComparator的具体实现类WritableComparator:
WritableComparator类类似于一个注册表,里面记录了所有Comparator类的集合。
Comparators成员用一张Hash表记录Key=Class,value=WritableComprator的注册信息.
WritableComparator主要提供了两个功能
提供了对原始compare()方法的一个默认实现
默认实现是 先反序列化为对像 再通过 对像比较(有开销的问题)
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
buffer.reset(b1, s1, l1);
// parse key1
key1.readFields(buffer);
buffer.reset(b2, s2, l2);
// parse key2
key2.readFields(buffer);
} catch (IOException e) {
throw new RuntimeException(e);
return compare(key1, key2);
// compare them
而对应的基础数据类型的compare()的实现却巧妙的利用了特定类型的泛化:(利用了writableComparable的compareTo方法)
public int compare(WritableComparable a, WritableComparable b) {
return a.compareTo(b);
例如IntWritable实例是调用了IntWritable里的compareTo方法
public int compareTo(Object o) {
int thisValue = this.
int thatValue = ((IntWritable)o).
return (thisValue&thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
充当RawComparable实例的工厂,以注册Writable的实现
例如,为了获取IntWritable的Comparator,可以直接调用其get方法。
WritableComparator:
关键代码:
代码1:registry 注册器
----------------------------------------------------------------
// registry 注册器:记载了WritableComparator类的集合
private static HashMap&Class, WritableComparator&comparators =
new HashMap&Class, WritableComparator&();
代码2:获取WritableComparator实例
说明:hashMap作为容器类线程不安全,故需要synchronized同步,get方法根据key=Class返回对应的WritableComparator,若返回的是空值NUll,则调用protected Constructor进行构造,而其两个protected的构造函数实则是调用了newKey()方法进行NewInstance
public static synchronized WritableComparator get(Class&? extends WritableComparable& c) {
WritableComparator comparator = comparators.get(c);
if (comparator == null)
comparator = new WritableComparator(c, true);
代码3:构造方法
---------------------------------------------------------------
new WritableComparator(c, true)
WritableComparator的构造函数源码如下:
* keyClass,key1,key2和buffer都是用于WritableComparator的构造函数
private final Class&? extends WritableComparable& keyC
private final WritableComparable key1;
//WritableComparable接口
private final WritableComparable key2;
private final DataInputB
//输入缓冲流
protected WritableComparator(Class&? extends WritableComparable& keyClass,
boolean createInstances) {
this.keyClass = keyC
if (createInstances) {
key1 = newKey();
key2 = newKey();
buffer = new DataInputBuffer();
key1 = key2 =
上述的keyClass,key1,key2,buffer是记录HashMap对应的key值,用于WritableComparator的构造函数,但由其构造函数中我们可以看出WritableComparator根据Boolean createInstance来判断是否实例化key1,key2和buffer,而key1,key2作为实现了WritableComparable接口的标识,在WritableComparator的构造函数里面通过newKey()的方法去实例化实现WritableComparable接口的一个对象,下面是newKey()的源码,通过hadoop自身的反射去实例化了一个WritableComparable接口对象。
public WritableComparable newKey() {
return ReflectionUtils.newInstance(keyClass, null);
代码4:Compare()方法
---------------------------------------------------------------------
public int compare(Object a, Object b);
public int compare(WritableComparable a, WritableComparable b);
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
三个compare()重载方法中,compare(Object a, Object b)利用子类塑形为WritableComparable而调用了第2个compare方法,而第2个Compare()方法则调用了Writable.compaerTo();最后一个compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法源码如下:
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
buffer.reset(b1, s1, l1);
// parse key1
key1.readFields(buffer);
buffer.reset(b2, s2, l2);
// parse key2
key2.readFields(buffer);
} catch (IOException e) {
throw new RuntimeException(e);
return compare(key1, key2);
// compare them
Compare方法的一个缺省实现方式,根据接口key1,ke2反序列化为对象再进行比较。
利用Buffer为桥接中介,把字节数组存储为buffer后,调用key1(WritableComparable)的反序列化方法,再来比较key1,ke2,由此处可以看出,该compare方法是将要比较的二进制流反序列化为对象,再调用方法第2个重载方法进行比较。
代码5:方法define方法
该方法用于注册WritebaleComparaor对象到注册表中,注意同时该方法也需要同步,代码如下:
public static synchronized void define(Class c,
WritableComparator comparator) {
comparators.put(c, comparator);
代码5:余下诸如readInt的静态方法
---------------------------------------------------------------------
这些方法用于实现WritableComparable的各种实例,例如 IntWritable实例:内部类Comparator类需要根据自己的IntWritable类型重载WritableComparator里面的compare()方法,可以说WritableComparator里面的compare()方法只是提供了一个缺省的实现,而真正的compare()方法实现需要根据自己的类型如IntWritable进行重载,所以WritableComparator方法中的那些readInt..等方法只是底层的封装的一个实现,方便内部Comparator进行调用而已。
下面我们着重看下BooleanWritable类的内置RawCompartor&T&的实现过程:
* A Comparator optimized for BooleanWritable.
public static class Comparator extends WritableComparator {
public Comparator() {//调用父类的Constructor初始化keyClass=BooleanWrite.class
super(BooleanWritable.class);
//重写父类的序列化比较方法,用些类用到父类提供的缺省方法
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
boolean a = (readInt(b1, s1) == 1) ? true :
boolean b = (readInt(b2, s2) == 1) ? true :
return ((a == b) ? 0 : (a == false) ? -1 : 1);
WritableComparator.define(BooleanWritable.class, new Comparator());
hadoop 类似于Java的类包,即提供了Comparable接口(对应于writableComparable接口)和Comparator类(对应于RawComparator类)用于实现序列化的比较,在hadoop 的IO包中已经封装了JAVA的基本数据类型用于序列化和反序列化,一般自己写的类实现序列化和反序列化需要继承WritableComparable接口并且内置一个Comparator(继承于WritableComparator)的格式来实现自己的对象。
Copyright (C) , All Rights Reserved.
版权所有 闽ICP备号
processed in 0.032 (s). 12 q(s)博客分类:
最近项目需求需要完善Sqoop的更多功能点,其中一项是将Hbase的数据导出到hdfs或hive,重点是Hbase出来的数据需要支持条件过滤。类似于Sql中的什么 & ,& ,=,主要是针对数字类型的数据过滤 等。
研究了关于Hbase的过滤只能通过Filter来进行,其中符合我们条件的Filter有一个:
SingleColumnValueFilter
这个Filter支持根据字段值进行过滤。
但是Filter 的 Comparator 没有一个支持数字类型比较器,BinaryComparator,BitComparator这些比较器没法实现我们的需求,使用他们过滤出来的数据不准确。于是目前想到的有两种方案
1.Scan出数据以后自己通过条件过滤每一行数据满不满足条件。(不雅观)
2.自定义满足条件的Comparator 。
最终选择自定义Comparator这种方案。
在网上搜索了一下,并且看了HBase现有的Comparator的源码,自定义Comparator需要做以下这些事:
1.定义protobuf文件
protobuf文件定义可以参考hbase源码的hbase-protocol模块下面的protobuf文件夹下面的Comparator.proto文件。我是直接拷贝过来然后修改修改。
至于为什么需要定义proto文件,是因为hbase所有的rpc数据交互都是通过protobuf来完成的。
下面是我定义的proto文件:
---------------------------------------------------------------------------------------------------------------
// This file contains protocol buffers that are used for filters
option java_package = "com.star.hbase.defined.comparator";
option java_outer_classname = "ComparatorProtos";
option java_generic_services =
option java_generate_equals_and_hash =
option optimize_for = SPEED;
// This file contains protocol buffers that are used for comparators (e.g. in filters)
message NumberComparator{
required bytes value = 1;
required string fieldType = 2;
---------------------------------------------------------------------------------------------------------------
通过以下命令生产java类:前提是protoc.exe必须在当前目录
protoc --java_out=D:\proto NumberComparator.proto
具体protobuf的用法及其他我就不说了,网上搜一下即可。
我定义了一个NumberComparator的vo类,它下面有两个字段 ,第一个是需要进行过滤的值,第二个是需要将hbase的指定列转成对应的类型进行比较 ,比如 int double等 只支持数字类型。
2.创建比较器java类并且该类继承ByteArrayComparable
具体代码如下:
-------------------------------------------------------------------------------------------------------------------
package com.star.hbase.defined.
import com.google.protobuf.ByteS
import com.google.protobuf.InvalidProtocolBufferE
import org.apache.hadoop.hbase.exceptions.DeserializationE
import org.apache.hadoop.hbase.filter.ByteArrayC
import org.apache.hadoop.hbase.util.B
* Created with Intellij IDEA
* User: star
* Time: 17:10
* function:
* To change this template use File | Settings | File Templates.
public class NumberComparator extends ByteArrayComparable {
private String fieldT
private byte []
* Constructor
* @param value value
public NumberComparator(byte[] value,String fieldType) {
super(value);
this.fieldType = fieldT
this.data =
//重写该方法
public byte[] toByteArray() {
ComparatorProtos.NumberComparator.Builder builder =
ComparatorProtos.NumberComparator.newBuilder();
builder.setValue(ByteString.copyFrom(this.data));
builder.setFieldType(this.fieldType);
return builder.build().toByteArray();
//定义该方法,用于对象反序列化操作
public static NumberComparator parseFrom(final byte [] bytes) throws DeserializationException {
ComparatorProtos.NumberComparator proto =
proto = ComparatorProtos.NumberComparator.parseFrom(bytes);
} catch (InvalidProtocolBufferException e) {
throw new DeserializationException(e);
return new NumberComparator(proto.getValue().toByteArray(),proto.getFieldType());
//重写比较方法 里面就可以按照自己的意愿来实现自己的比较器
public int compareTo(byte[] bytes, int offset, int length) {
if(fieldType.equalsIgnoreCase("int") || fieldType.equalsIgnoreCase("integer")) {
Integer paramValue = byteConvertObj(Integer.class,data);
Integer currentValue = byteConvertObj(Integer.class,Bytes.copy(bytes, offset, length));
paramValue.compareTo(currentValue);
}else if(fieldType.equalsIgnoreCase("long") || fieldType.equalsIgnoreCase("bigint")){
Long paramsValue =
byteConvertObj(Long.class,data);
Long currentValue =
byteConvertObj(Long.class,Bytes.copy(bytes, offset, length));
return paramsValue.compareTo(currentValue);
}else if(fieldType.equalsIgnoreCase("float")){
Float paramsValue =
byteConvertObj(Float.class,data);
Float currentValue =
byteConvertObj(Float.class,Bytes.copy(bytes, offset, length));
return paramsValue.compareTo(currentValue);
}else if(fieldType.equalsIgnoreCase("double")){
Double paramsValue =
byteConvertObj(Double.class,data);
Double currentValue =
byteConvertObj(Double.class,Bytes.copy(bytes, offset, length));
return paramsValue.compareTo(currentValue);
}else if(fieldType.equalsIgnoreCase("short") || fieldType.equalsIgnoreCase("SMALLINT")){
Short paramsValue =
byteConvertObj(Short.class,data);
Short currentValue =
byteConvertObj(Short.class,Bytes.copy(bytes, offset, length));
return paramsValue.compareTo(currentValue);
private &T& T
byteConvertObj(Class&T& clazz,byte [] data){
String clazzName
= clazz.getSimpleName();
if(clazzName.equalsIgnoreCase("Integer")){
Integer paramV
paramValue = Bytes.toInt(data);
} catch (IllegalArgumentException
paramValue = Integer.valueOf(Bytes.toString(data));
return (T)paramV
}else if(clazzName.equalsIgnoreCase("Long")){
Long paramV
paramValue = Bytes.toLong(data);
} catch (IllegalArgumentException
paramValue = Long.valueOf(Bytes.toString(data));
return (T)paramV
}else if(clazzName.equalsIgnoreCase("Float")){
Float paramV
paramValue = Bytes.toFloat(data);
} catch (IllegalArgumentException
paramValue = Float.valueOf(Bytes.toString(data));
return (T)paramV
}else if(clazzName.equalsIgnoreCase("Double")){
Double paramV
paramValue = Bytes.toDouble(data);
} catch (IllegalArgumentException
paramValue = Double.valueOf(Bytes.toString(data));
return (T)paramV
}else if(clazzName.equalsIgnoreCase("Short")){
Short paramV
paramValue = Bytes.toShort(data);
} catch (IllegalArgumentException
paramValue = Short.valueOf(Bytes.toString(data));
return (T)paramV
---------------------------------------------------------------------------------------------------------------------------------
至此 比较器定义完成,接着需要将该protobuf生产的java类和我们定义的Comparator类打成jar包,然后放到Hbase目录下面的lib目录里面,这样才真正执行的时候才能找到该类。放进去后需要重启以下hbase集群。
最后我们写一个测试来看下我们的自定义比较器是否生效:局部代码:
-------------------------------------------------------------------------------------------------------------
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("info"));
FilterList filterList = new FilterList();
NumberComparator comparator = new NumberComparator(Bytes.toBytes(1500),"int"); //自定义的比较器传入我们自己定义的两个参数
SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("info"),Bytes.toBytes("id"),
CompareFilter.CompareOp.GREATER,comparator);
filterList.addFilter(filter);
scan.setFilter(filterList);
-------------------------------------------------------------------------------------------------------------
最后成功实现该功能。
实现期间遇到的一个异常我在这里列出来 尤为深刻:
这个异常时因为自定义的Comparator里面出现了异常,然后数据传输遇到问题,数据一直传输不过去出现的问题。可能解释的不是特别好,希望大家可以完善,毕竟自身的对hbase还不是特别熟悉。
--------------------------------------------------------------------------------------------------------------------------
java.lang.RuntimeException: org.apache.hadoop.hbase.DoNotRetryIOException: Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?
at org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:94)
at org.springframework.data.hadoop.hbase.RowMapperResultsExtractor.extractData(RowMapperResultsExtractor.java:46)
at org.springframework.data.hadoop.hbase.RowMapperResultsExtractor.extractData(RowMapperResultsExtractor.java:30)
at org.springframework.data.hadoop.hbase.HbaseTemplate$1.doInTable(HbaseTemplate.java:131)
at org.springframework.data.hadoop.hbase.HbaseTemplate.execute(HbaseTemplate.java:58)
at org.springframework.data.hadoop.hbase.HbaseTemplate.find(HbaseTemplate.java:126)
at org.springframework.data.hadoop.hbase.HbaseTemplate.find(HbaseTemplate.java:155)
at com.csx.hbase.HBaseServiceImpl.scan(HBaseServiceImpl.java:116)
at com.csx.hbase.TestHbaseServiceImpl.test(TestHbaseServiceImpl.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:74)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:82)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:72)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:240)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:180)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
Caused by: org.apache.hadoop.hbase.DoNotRetryIOException: Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?
at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:402)
at org.apache.hadoop.hbase.client.AbstractClientScanner$1.hasNext(AbstractClientScanner.java:91)
... 39 more
Caused by: org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException: org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException: Expected nextCallSeq: 1 But the nextCallSeq got from client: 0; request=scanner_id: 14 number_of_rows: 100 close_scanner: false next_call_seq: 0
at org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3098)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29497)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2012)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:98)
at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.consumerLoop(SimpleRpcScheduler.java:168)
at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.access$000(SimpleRpcScheduler.java:39)
at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler$1.run(SimpleRpcScheduler.java:111)
at java.lang.Thread.run(Thread.java:745)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:525)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:95)
at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRemoteException(ProtobufUtil.java:285)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:204)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:59)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:114)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:90)
at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:354)
... 40 more
Caused by: org.apache.hadoop.hbase.ipc.RemoteWithExtrasException(org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException): org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException: Expected nextCallSeq: 1 But the nextCallSeq got from client: 0; request=scanner_id: 14 number_of_rows: 100 close_scanner: false next_call_seq: 0
at org.apache.hadoop.hbase.regionserver.HRegionServer.scan(HRegionServer.java:3098)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:29497)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2012)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:98)
at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.consumerLoop(SimpleRpcScheduler.java:168)
at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler.access$000(SimpleRpcScheduler.java:39)
at org.apache.hadoop.hbase.ipc.SimpleRpcScheduler$1.run(SimpleRpcScheduler.java:111)
at java.lang.Thread.run(Thread.java:745)
at org.apache.hadoop.hbase.ipc.RpcClient.call(RpcClient.java:1453)
at org.apache.hadoop.hbase.ipc.RpcClient.callBlockingMethod(RpcClient.java:1657)
at org.apache.hadoop.hbase.ipc.RpcClient$BlockingRpcChannelImplementation.callBlockingMethod(RpcClient.java:1715)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:29900)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:174)
... 44 more
lucky_xingxing
浏览: 70362 次
来自: 成都
我是3.1的版本 怎么还是报这个错
谢谢!!也解决了我的问题。
也解决了我的问题,谢谢!
Arrays.sort(keys);性能差。。。
你有强迫症哦 ...
这种在通过like查找的时候,和list是一样的。HashMa ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'}

我要回帖

更多关于 运放比较器电路及原理 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信