Hadoop执行示例C编译系统对程序中的宏展开是在问题问题,怎么解决

博客分类:
(1)hadoop2.7.1源码编译
(2)hadoop2.7.1安装准备
(3)1.x和2.x都支持的集群安装
(4)hbase安装准备
(5)hbase安装
(6)snappy安装
(7)hbase性能优化
(8)雅虎YCSB测试hbase性能测试
(9)spring-hadoop实战
(10)基于ZK的Hadoop HA集群安装
很荣幸博客被ITEYE知识库收录,在这里,也不希望本篇内容过于简单,会不断进行更新......
这里主要借助Yahoo开源的一款通用的性能测试工具YCSB对已经安装的hadoop2.7.1+hbase1.2.1进行性能测试。
写这篇文章参考了以下三篇文章:
1.hbase性能测试
2.Hadoop参考设计的实现及性能:HBase应用性能测试方法
3.Linux环境下Python的安装过程
4.Runing a Workload
1.Yahoo开源测试工具YCSB介绍
1.1 简单介绍
英文全称:Yahoo! Cloud Serving Benchmark (YCSB) 。是 Yahoo 公司的一个用来对云服务进行基础测试的工具。目标是促进新一代云数据服务系统的性能比较。为四个广泛使用的系统:Cassandra,、HBase、PNUTS和一个简单的片式MySQL执行,订了套核心基准测试和结果报告。
目前项目开源地址:
1.2 可以测试哪些技术?每个的详细测试介绍?
https://github.com/brianfrankcooper/YCSB/tree/master/accumulo
https://github.com/brianfrankcooper/YCSB/tree/master/aerospike
asynchbase
https://github.com/brianfrankcooper/YCSB/tree/master/asynchbase
https://github.com/brianfrankcooper/YCSB/tree/master/cassandra
cassandra2
https://github.com/brianfrankcooper/YCSB/tree/master/cassandra2
https://github.com/brianfrankcooper/YCSB/tree/master/couchbase
couchbase2
https://github.com/brianfrankcooper/YCSB/tree/master/couchbase2
https://github.com/brianfrankcooper/YCSB/tree/master/dynamodb
elasticsearch
https://github.com/brianfrankcooper/YCSB/tree/master/elasticsearch
https://github.com/brianfrankcooper/YCSB/tree/master/geode
googlebigtable
https://github.com/brianfrankcooper/YCSB/tree/master/googlebigtable
googledatastore
https://github.com/brianfrankcooper/YCSB/tree/master/googledatastore
hypertable
https://github.com/brianfrankcooper/YCSB/tree/master/infinispan
https://github.com/brianfrankcooper/YCSB/tree/master/jdbc
https://github.com/brianfrankcooper/YCSB/tree/master/kudu
https://github.com/brianfrankcooper/YCSB/tree/master/mapkeeper
https://github.com/brianfrankcooper/YCSB/tree/master/nosqldb
https://github.com/brianfrankcooper/YCSB/tree/master/orientdb
https://github.com/brianfrankcooper/YCSB/tree/master/rados
https://github.com/brianfrankcooper/YCSB/tree/master/riak
https://github.com/brianfrankcooper/YCSB/tree/master/s3
https://github.com/brianfrankcooper/YCSB/tree/master/tarantool
https://github.com/brianfrankcooper/YCSB/tree/master/voldemort
2.安装Yahoo开源的性能测试工具YCSB
2.1 首先安装python2.7.10
1)在https://www.python.org/ftp/python/2.7.10/Python-2.7.10.tgz下载Python-2.7.10.tgz
2)通过tar -xzf Python-2.7.10.tgz解压到/opt/Python-2.7.10
3)通过cd /opt/Python-2.7.10,依次执行./configue,make,make install这三个命令进行安装
4)如果不安装或者python版本过老,在后面会报错如下:
No module named argparse
2.2 安装YCSB
1)在下载ycsb-0.3.1.tar.gz,解压到/opt/ycsb-0.3.1
2)复制/opt/hbase-1.2.1/conf/hbase-site.xml到/opt/ycsb-0.3.1/hbase-binding/;
复制/opt/hbase-1.2.1/lib/*到/opt/ycsb-0.3.1/hbase-binding/lib/
我的hbase安装根目录是 /opt/hbase-1.2.1/
我的YCSB安装目录是
/opt/ycsb-0.3.1/
cp /opt/hbase-1.2.1/conf/hbase-site.xml
/opt/ycsb-0.3.1/hbase-binding/
cp /opt/hbase-1.2.1/lib/*
/opt/ycsb-0.3.1/hbase-binding/lib/
3)安装YCSB完成
3.YCSB使用介绍
3.1 YCSB命令
3.2 YCSB命令参数
Specify workload file
Additional Java classpath entries
-jvm-args args Additional arguments to the JVM
-p key=value
Override workload property
Print status to stderr
Target ops/sec (default: unthrottled)
-threads n
Number of client threads (default: 1)
3.3 默认配置介绍
在YCSB安装完成后,默认在安装目录../ycsb-0.3.1/workloads下有如下默认配置脚本,截图如下:
这里我以配置文件../ycsb-0.3.1/workloads/workloada为例进行介绍,首先贴出其内容:
# Copyright (c) 2010 Yahoo! Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you # may not use this file except in compliance with the License. You # may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. See the License for the specific language governing # permissions and limitations under the License. See accompanying # LICENSE file. # Yahoo! Cloud System Benchmark# Workload A: Update heavy workload# Application example: Session store recording recent actions# # Read/update ratio: 50/50# Default data size: 1 KB records (10 fields, 100 bytes each, plus key)# Request distribution: zipfianrecordcount=1000operationcount=1000workload=com.yahoo.ycsb.workloads.CoreWorkloadreadallfields=truereadproportion=0.5updateproportion=0.5scanproportion=0insertproportion=0requestdistribution=zipfian
recordcount=1000
默认操作数据总记录数为1000
operationcount=1000
workload=com.yahoo.ycsb.workloads.CoreWorkload
readallfields=true
readproportion=0.5
读取操作占比50%
updateproportion=0.5
更新操作占比50%
scanproportion=0
hbase scan占比0%
insertproportion=0
插入操作占比0%
requestdistribution=zipfian
上面是YCSB给定的几个默认压测场景之一,其上配置不指定的话就使用该默认值,如果指定的话,传入响应参数就行了,例如:
./ycsb load hbase -P ../workloads/workloada -p threads=10 -p columnfamily=f1 -p recordcount=10000 -s
上述意思是,调用ycsb中加载数据命令load对hbase进行测试,配置文件采用../ycsb-0.3.1/workloads/workloada,客户端线程数设置为10,默认插入到hbase的数据表usertable中的列簇f1中,记录数为100000,这里的记录数就是传入进来的,会覆盖../ycsb-0.3.1/workloads/workloada里面的默认值recordcount
4.YCSB测试HBASE性能实战
测试过程分为两个阶段,首先是加载数据,其次是执行事务。
4.1 加载数据
cd /opt/ycsb-0.3.1/bin
./ycsb load hbase -P ../workloads/workloada -p threads=10 -p columnfamily=f1 -p recordcount=10000 -s & load.log
命令分析:
load:表明为加载数据测试。
hbase:表明测试的是hbase性能
../workloads/workloada ,指定配置文件为/opt/ycsb-0.3.1/workloads/workloada
threads=10
客户端线程数为10
-p columnfamily=f1 指定数据插入到表usertable的列簇f1,所以你得首先在hbase shell中执行create 'usertable','f1','f2','f3'
-p recordcount=10000 插入10000行记录
执行后向HBase Server下的usertable,f1插入10000条数据,并将执行的情况打印到屏幕上。查看load.log,信息如下:
4.2 执行事务
cd /opt/ycsb-0.3.1/bin
./ycsb run hbase -P ../workloads/workloada -threads 10 -p measurementtype=timeseries -p columnfamily=f1 -p timeseries.granularity=2000 & transactions.log
执行后查看transactions.log,信息如下:
总执行时间为1568毫秒
吞吐量为637.755 ops/s
530个读操作,其中532个读不超过1s,532个更新操作,其中468个更新不超过1s
读操作最小延迟692微秒,最大延迟109651微秒,平均延迟2447微秒
更新操作最小延迟4微秒,最大延迟177337微秒,平均5延迟174微秒
因为之前我只用两台机器做hbase集群,实际上其中一台作为HMASTER,另一台作为HREGION,所以性能不是很好,后面需要增加HREGION,还需要做的是对Hadoop和hbase进行优化。
4.3 另一个例子
首先在hbase创建测试需要的数据表,并进行预分区(建表usertable 列簇为cf 分50区)
cd /home/hadoop/hbase-1.2.1/bin/
./hbase shell
n_splits = 49
create 'usertable', 'cf', {SPLITS =& (1..n_splits).map {|i| "user#{1000+i*()/n_splits}"}}
接着开启测试
cd /home/hadoop/ycsb-0.3.1
bin/ycsb load hbase-10 -P workloads/workloada -p columnfamily=cf -p recordcount= -jvm-args "-Xms2048m -Xmx2048m -XX:PermSize=128m -XX:MaxPermSize=256m" -p threadcount=100 -s |tee -a hbase_100thread_record.log
上面参数含义:
说明测试的是针对hbase1.x版本
-p columnfamily=cf
插入hbase的列簇cf
-p recordcount=
总计插入1亿条数据
-jvm-args '-Xms2048m -Xmx2048m -XX:PermSize=128m -XX:MaxPermSize=256m'
设置JVM参数
-p threadcount=100 客户端线程数为100,并发100
-s |tee -a hbase10_test_result.log
记录测试结果到hbase10_test_result.log
-P workloads/workloada
采用默认配置workloads/workloada,里面设置了插入占比50%,更新占比50%,默认往cf列簇插入10个列的数据,每个列的数据大小为100bytes,10个列的话总计大约1K大小数据
zilongzilong
浏览: 150266 次
来自: 上海
写的很好!!!
furyamber 写道你好,我们也遇到了一模一样的错误,之前 ...
恩,是的,升级内核就可以了,这个是内核级BUG
你好,我们也遇到了一模一样的错误,之前看了个https://b ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'如何编译 hadoop.dll 方法_百度知道
如何编译 hadoop.dll 方法
答题抽奖
首次认真答题后
即可获得3次抽奖机会,100%中奖。
xiangjuan314
xiangjuan314
采纳数:23843
获赞数:20465
环境变量(工具软件:3,4,5,6,7)安装过程不再详述,一路Next就行(JavaSE推荐根目录:c:\java)。重点在环境变量的配置部分,增加以下环境变量到“系统变量”中(Java路径不能有空格):JAVA_HOME=C:\Java\jdk1.7.0_45Platform=Win32M2_HOME=C:\apache-maven-3.2.1Path=;C:\cygwin\C:\apache-maven-3.2.1\C:\protoc-2.5.0-win32;配置示例(别忘了设置Path哦):解压源代码将hadoop-2.2.0-src.tar.gz源代码解压到D盘根目录,看上去路径如下:D:\hadoop-2.2.0\Apache Hadoop svn 代码库地址:需要手工修正源代码的几处编译错误:第一处:修改文件:\hadoop-common-project\hadoop-auth\pom.xml修改内容:在大约56行的位置增加一个Xml配置节点。&dependency&  &groupId&org.mortbay.jetty&/groupId&  &artifactId&jetty-util&/artifactId&  &scope&test&/scope&&/dependency&修改示例:第二处:修改文件:hadoop-common-project\hadoop-common\src\main\native\native.sln修改内容:用记事本打开文件。替换内容:GlobalSection(ProjectConfigurationPlatforms) = postSolution{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Debug|Mixed Platforms.ActiveCfg = Release|x64{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Debug|Mixed Platforms.Build.0 = Release|x64{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Debug|Win32.ActiveCfg = Release|x64{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Debug|Win32.Build.0 = Release|x64{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Debug|x64.ActiveCfg = Release|x64{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Debug|x64.Build.0 = Release|x64{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Release|Mixed Platforms.ActiveCfg = Release|x64{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Release|Mixed Platforms.Build.0 = Release|x64{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Release|Win32.ActiveCfg = Release|x64{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Release|Win32.Build.0 = Release|x64{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Release|x64.ActiveCfg = Release|x64{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Release|x64.Build.0 = Release|x64EndGlobalSection新内容:GlobalSection(ProjectConfigurationPlatforms) = postSolution{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Debug|Win32.ActiveCfg = Release|Win32{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Debug|Win32.Build.0 = Release|Win32{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Debug|x64.ActiveCfg = Release|x64{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Debug|x64.Build.0 = Release|x64{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Release|Win32.ActiveCfg = Release|Win32{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Release|Win32.Build.0 = Release|Win32{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Release|x64.ActiveCfg = Release|x64{4C0C12D2-3CB0-47F8-BCD0-55BD5732DFA7}.Release|x64.Build.0 = Release|x64EndGlobalSection修改示例:第三处:修改文件:hadoop-common-project\hadoop-common\src\main\native\native.vcxproj修改内容:查找替换”Release|x64“为”Release|Win32“查找替换”&Platform&x64&/Platform&“为”&Platform&Win32&/Platform&“修改示例:第四处:右键单击”D:\hadoop-2.2.0“文件夹,选择”管理员取得所有权“。否则编译过程中可能会发生”拒绝访问“错误(右键没有显示该菜单的,自行网上查找注册表修改方法)。 编译过程 打开“开始”--“所有程序”--“Microsoft Windows SDK v7.1”--“Windows SDK 7.1 Command Prompt”,进入VC++的命令行工具(一定要从此处进入方可顺利编译Hadoop源代码,记着是以管理员身份运行)。命令如下:切换至源代码根目录,执行编译命令:mvn package -Pdist,native-win -DskipTests -Dtar示例运行结果Setting SDK environment relative to C:\Program Files\Microsoft SDKs\Windows\v7.1\.Targeting Windows 7 x86 DebugC:\Windows\system32&d:D:\&cd D:\hadoop-2.2.0D:\hadoop-2.2.0&mvn package -Pdist,native-win -DskipTests -Dtar................................2.2.1版本编译完成大约需要16分钟左右................................2.4.0版本编译完成大约需要22分钟左右(觉得编译过程慢的话:自己动手配置镜像服务器)配置运行Hadoop编译成功后,程序集输出在: hadoop-common-project\hadoop-common\target\hadoop-common-2.2.0目录下。1.解压缩”hadoop-2.2.0.tar.gz“至D盘如下目录:D:\hadoop-common-2.2.02.合并替换发行版本的64动态链接库:主要是是以下几个文件(bin\hadoop.dll、bin\hadoop.exp、bin\hadoop.lib、bin\hadoop.pdb、bin\libwinutils.lib、bin\winutils.exe、bin\winutils.pdb),从编译成功后的输出目录Copy到Apache发行版形同目录下替换即可。3.修改配置文件core-site.xml&?xml version=&1.0& encoding=&UTF-8&?&&?xml-stylesheet type=&text/xsl& href=&configuration.xsl&?&&configuration& &property&
&name&fs.defaultFS&/name&
&value&hdfs://localhost:9000&/value& &/property&&/configuration&hdfs-site.xml&?xml version=&1.0& encoding=&UTF-8&?&&?xml-stylesheet type=&text/xsl& href=&configuration.xsl&?&&configuration& &property&
&name&dfs.replication&/name&
&value&1&/value&
&/property&
&property&
&name&dfs.namenode.name.dir&/name&
&value&file:/hadoop-bin/data/namenode&/value&
&/property&
&property&
&name&dfs.datanode.data.dir&/name&
&value&file:/hadoop-bin/data/datanode&/value&
&/property&
&property&
&name&dfs.webhdfs.enabled&/name&
&value&true&/value&
&/property&
&property&
&name&dfs.permissions&/name&
&value&false&/value&
&/property&&/configuration&其他配置文件保持默认即可。4.配置Hadoop Hdfs运行环境变量(重要)HADOOP_HOME=D:\hadoop-common-2.2.0Path=D:\hadoop-common-2.2.0\bin5.格式化hdfs文件系统以管理员身份打开命令行,并切换到:D:\hadoop-common-2.2.0\bin目录下,执行命令:hadoop namenode -format如果不出意外,hdfs文件系统将格式化成功,你会在D:\hadoop-bin\data看到已经生成了namenode文件夹。6.启动Hadoop HDFS服务器同样管理员身份命令行,切换到:D:\hadoop-common-2.2.0\sbin目录下,执行命令:start-all.cmd不出意外,用浏览器打开: 或者
会有惊喜哦!记得别把本地的端口占用了。7.上传文件到HDFS如果你对命令行熟悉的话,可以采用命令行的方式推送的HDFS。如果你的hdfs-site.xml配置文件时Copy我以上的示例的话,那么WEBHDFS默认是开启的,什么意思?就是通过Http RestFull风格API管理文件哦!另外还要记得把dfs.permissions设置为false,要不你没有权限上传文件的哦!推荐个HDFS文件管理工具:Red Gate Software Ltd(大名鼎鼎的Red Gate)上传文件到HDFS8.停止Hadoop HDFS服务器同样管理员身份命令行,切换到:D:\hadoop-common-2.2.0\sbin目录下,执行命令:stop-all.cmd编后语Windows 64 Bit是有它自己的道理的,虽然通过一些努力编译出了32Bit环境下的程序。。。
为你推荐:
其他类似问题
个人、企业类
违法有害信息,请在下方选择后提交
色情、暴力
我们会通过消息、邮箱等方式尽快将举报结果通知您。&|&&|&&|&&|&&
当前位置: >
hadoop-Streaming 整合hbase执行C代码实例
作者:cp1985chenpeng & 来源:转载 &
这两天一直在弄hadoop-streaming整合hbase执行C语言的问题。
昨天在Eclipse上自己写了一个TextTableInputFormat类继承了InputFormat类去解析hbase,然后通过自己写的mapreduce测试没有问题。
下面这个类是分析InputFormat& Hbase的基类。
import java.io.IOException
这两天一直在弄hadoop-streaming整合hbase执行C语言的问题。
昨天在Eclipse上自己写了一个TextTableInputFormat类继承了InputFormat类去解析hbase,然后通过自己写的mapreduce测试没有问题。
下面这个类是分析InputFormat& Hbase的基类。
import java.io.IOE import java.util.ArrayL import java.util.L import org.apache.commons.logging.L import org.apache.commons.logging.LogF import org.apache.hadoop.conf.C import org.apache.hadoop.hbase.client.HT import org.apache.hadoop.hbase.client.R import org.apache.hadoop.hbase.client.S import org.apache.hadoop.hbase.io.ImmutableBytesW import org.apache.hadoop.hbase.mapred.TableInputF import org.apache.hadoop.hbase.mapred.TableS import org.apache.hadoop.hbase.util.B import org.apache.hadoop.hbase.util.P import org.apache.hadoop.io.T import org.apache.hadoop.mapred.FileInputF import org.apache.hadoop.mapred.InputF import org.apache.hadoop.mapred.JobC import org.apache.hadoop.mapred.R import org.apache.hadoop.mapred.InputS import org.apache.hadoop.mapred.JobC import org.apache.hadoop.mapred.RecordR public abstract class TextTableInputFormatBase implements &&& &&& InputFormat&Text, Text&, Configurable { &&& final Log LOG = LogFactory.getLog(TextTableInputFormatBase.class); &&& /** Holds the details for the internal scanner. */ &&& private Scan scan = &&& /** The table to scan. */ &&& private HTable table = &&& public InputSplit[] getSplits(JobConf job, int numSplits) &&& &&& &&& throws IOException { &&& &&& if (table == null) { &&& &&& &&& throw new IOException(&No table was provided.&); &&& &&& } &&& &&& Pair&byte[][], byte[][]& keys = table.getStartEndKeys(); &&& &&& if (keys == null || keys.getFirst() == null &&& &&& &&& &&& || keys.getFirst().length == 0) { &&& &&& &&& throw new IOException(&Expecting at least one region.&); &&& &&& } &&& &&& int count = 0; &&& &&& InputSplit[] splits = new InputSplit[keys.getFirst().length]; &&& &&& for (int i = 0; i & keys.getFirst(). i++) { &&& &&& &&& if (!includeRegionInSplit(keys.getFirst()[i], keys.getSecond()[i])) { &&& &&& &&& &&& &&& &&& &&& } &&& &&& &&& String regionLocation = table.getRegionLocation(keys.getFirst()[i]) &&& &&& &&& &&& &&& .getServerAddress().getHostname(); &&& &&& &&& byte[] startRow = scan.getStartRow(); &&& &&& &&& byte[] stopRow = scan.getStopRow(); &&& &&& &&& // determine if the given start an stop key fall into the region &&& &&& &&& if ((startRow.length == 0 || keys.getSecond()[i].length == 0 || Bytes &&& &&& &&& &&& &&& .compareTo(startRow, keys.getSecond()[i]) & 0) &&& &&& &&& &&& &&& && (stopRow.length == 0 || Bytes.compareTo(stopRow, &&& &&& &&& &&& &&& &&& &&& keys.getFirst()[i]) & 0)) { &&& &&& &&& &&& byte[] splitStart = startRow.length == 0 &&& &&& &&& &&& &&& &&& || Bytes.compareTo(keys.getFirst()[i], startRow) &= 0 ? keys &&& &&& &&& &&& &&& &&& .getFirst()[i] : startR &&& &&& &&& &&& byte[] splitStop = (stopRow.length == 0 || Bytes.compareTo( &&& &&& &&& &&& &&& &&& keys.getSecond()[i], stopRow) &= 0) &&& &&& &&& &&& &&& &&& && keys.getSecond()[i].length & 0 ? keys.getSecond()[i] &&& &&& &&& &&& &&& &&& : stopR &&& &&& &&& &&& InputSplit split = new TableSplit(table.getTableName(), &&& &&& &&& &&& &&& &&& splitStart, splitStop, regionLocation); &&& &&& &&& &&& splits[i] = &&& &&& &&& &&& if (LOG.isDebugEnabled()) &&& &&& &&& &&& &&& LOG.debug(&getSplits: split -& & + (count++) + & -& & &&& &&& &&& &&& &&& &&& &&& + split); &&& &&& &&& } &&& &&& } &&& &&& &&& } &&& public RecordReader&Text, Text& getRecordReader(InputSplit split, JobConf job, &&& &&& &&& Reporter reporter) throws IOException { &&& &&& if (table == null) { &&& &&& &&& throw new IOException( &&& &&& &&& &&& &&& &Cannot create a record reader because of a& &&& &&& &&& &&& &&& &&& &&& + & previous error. Please look at the previous logs lines from& &&& &&& &&& &&& &&& &&& &&& + & the task's full log for more details.&); &&& &&& } &&& &&& TableInputFormat inputFormat = new TableInputFormat(); &&& &&& return new TextTableRecordReader(inputFormat.getRecordReader(split, job, reporter)); &&& } &&& protected boolean includeRegionInSplit(final byte[] startKey, &&& &&& &&& final byte[] endKey) { &&& &&& &&& } &&& protected HTable getHTable() { &&& &&& return this. &&& } &&& protected void setHTable(HTable table) { &&& &&& this.table = &&& } &&& public Scan getScan() { &&& &&& if (this.scan == null) &&& &&& &&& this.scan = new Scan(); &&& &&& &&& } &&& public void setScan(Scan scan) { &&& &&& this.scan = &&& } &&& public abstract String formatRowResult(Result row); &&& public class TextTableRecordReader implements RecordReader&Text, Text& { &&& &&& private RecordReader&ImmutableBytesWritable, Result& tableRecordR &&&&&&& public TextTableRecordReader(RecordReader&ImmutableBytesWritable, Result& reader) { &&&&&&&&&&& tableRecordReader = &&&&&&& } &&&&&&& public void close() throws IOException { &&&&&&&&&&& tableRecordReader.close(); &&&&&&& } &&&&&&& public Text createKey() { &&&&&&&&&&& return new Text(&&); &&&&&&& } &&&&&&& public Text createValue() { &&&&&&&&&&& return new Text(&&); &&&&&&& } &&&&&&& public long getPos() throws IOException { &&&&&&&&&&& return tableRecordReader.getPos(); &&&&&&& } &&&&&&& public float getProgress() throws IOException { &&&&&&&&&&& return tableRecordReader.getProgress(); &&&&&&& } &&&&&&& public boolean next(Text key, Text value) throws IOException { &&&&&&&&&&& Result row = new Result(); &&&&&&&&&&& boolean hasNext = tableRecordReader.next(new ImmutableBytesWritable(key.getBytes()), row); &&&&&&&&&&& if (hasNext) { &&&&&&&&&&&&&&& key.set(row.getRow()); &&&&&&&&&&&&&&& value.set(formatRowResult(row)); &&&&&&&&&&& } &&&&&&&&&&& return hasN &&&&&&& } &&& } }
这个类主要是用来逐行获取hbase信息的。 import java.io.ByteArrayInputS import java.io.DataInputS import java.io.IOE import org.apache.hadoop.conf.C import org.apache.hadoop.hbase.KeyV import org.apache.hadoop.hbase.client.HT import org.apache.hadoop.hbase.client.R import org.apache.hadoop.hbase.client.S import org.apache.hadoop.hbase.io.ImmutableBytesW import org.apache.hadoop.hbase.mapreduce.TableInputF import org.apache.hadoop.hbase.mapreduce.TableMapReduceU import org.apache.hadoop.hbase.mapreduce.TableRecordR import org.apache.hadoop.hbase.mapreduce.TableS import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.B import org.apache.hadoop.io.T import org.apache.hadoop.mapreduce.InputS import org.apache.hadoop.mapreduce.RecordR import org.apache.hadoop.mapreduce.TaskAttemptC import org.apache.hadoop.util.StringU public class StringTableInputFormat extends TextTableInputFormatBase { &&& /** Job parameter that specifies the input table. */ &&& public static final String INPUT_TABLE = &hbase.mapreduce.inputtable&; &&& /** &&& &* Base-64 encoded scanner. All other SCAN_ confs are ignored if this is &&& &* specified. See {@link TableMapReduceUtil#convertScanToString(Scan)} for &&& &* more details. &&& &*/ &&& public static final String SCAN = &hbase.mapreduce.scan&; &&& /** Column Family to Scan */ &&& public static final String SCAN_COLUMN_FAMILY = &hbase.mapreduce.scan.column.family&; &&& /** Space delimited list of columns to scan. */ &&& public static final String SCAN_COLUMNS = &hbase.mapreduce.scan.columns&; &&& /** The timestamp used to filter columns with a specific timestamp. */ &&& public static final String SCAN_TIMESTAMP = &hbase.mapreduce.scan.timestamp&; &&& /** &&& &* The starting timestamp used to filter columns with a specific range of &&& &* versions. &&& &*/ &&& public static final String SCAN_TIMERANGE_START = &hbase.mapreduce.scan.timerange.start&; &&& /** &&& &* The ending timestamp used to filter columns with a specific range of &&& &* versions. &&& &*/ &&& public static final String SCAN_TIMERANGE_END = &hbase.mapreduce.scan.timerange.end&; &&& /** The maximum number of version to return. */ &&& public static final String SCAN_MAXVERSIONS = &hbase.mapreduce.scan.maxversions&; &&& /** Set to false to disable server-side caching of blocks for this scan. */ &&& public static final String SCAN_CACHEBLOCKS = &hbase.mapreduce.scan.cacheblocks&; &&& /** The number of rows for caching that will be passed to scanners. */ &&& public static final String SCAN_CACHEDROWS = &hbase.mapreduce.scan.cachedrows&; &&& /** The configuration. */ &&& private Configuration conf = &&& @Override &&& public Configuration getConf() { &&& &&& &&& } &&&
&&& Scan convertStringToScan(String base64) throws IOException { &&& &&& ByteArrayInputStream bis = new ByteArrayInputStream(Base64.decode(base64)); &&& &&& DataInputStream dis = new DataInputStream(bis); &&& &&& Scan scan = new Scan(); &&& &&& scan.readFields(dis); &&& &&& &&& & } &&& @Override &&& public void setConf(Configuration configuration) { &&& &&& this.conf = &&& &&& String tableName = conf.get(INPUT_TABLE); &&& &&& try { &&& &&& &&& setHTable(new HTable(new Configuration(conf), tableName)); &&& &&& } catch (Exception e) { &&& &&& &&& LOG.error(StringUtils.stringifyException(e)); &&& &&& } &&& &&& Scan scan = &&& &&& if (conf.get(SCAN) != null) { &&& &&& &&& try { &&& &&& &&& &&& scan = this.convertStringToScan(conf.get(SCAN)); &&& &&& &&& } catch (IOException e) { &&& &&& &&& &&& LOG.error(&An error occurred.&, e); &&& &&& &&& } &&& &&& } else { &&& &&& &&& try { &&& &&& &&& &&& scan = new Scan(); &&& &&& &&& &&& if (conf.get(SCAN_COLUMNS) != null) { &&& &&& &&& &&& &&& scan.addColumns(conf.get(SCAN_COLUMNS)); &&& &&& &&& &&& } &&& &&& &&& &&& if (conf.get(SCAN_COLUMN_FAMILY) != null) { &&& &&& &&& &&& &&& scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY))); &&& &&& &&& &&& } &&& &&& &&& &&& if (conf.get(SCAN_TIMESTAMP) != null) { &&& &&& &&& &&& &&& scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP))); &&& &&& &&& &&& } &&& &&& &&& &&& if (conf.get(SCAN_TIMERANGE_START) != null &&& &&& &&& &&& &&& &&& && conf.get(SCAN_TIMERANGE_END) != null) { &&& &&& &&& &&& &&& scan.setTimeRange( &&& &&& &&& &&& &&& &&& &&& Long.parseLong(conf.get(SCAN_TIMERANGE_START)), &&& &&& &&& &&& &&& &&& &&& Long.parseLong(conf.get(SCAN_TIMERANGE_END))); &&& &&& &&& &&& } &&& &&& &&& &&& if (conf.get(SCAN_MAXVERSIONS) != null) { &&& &&& &&& &&& &&& scan.setMaxVersions(Integer.parseInt(conf &&& &&& &&& &&& &&& &&& &&& .get(SCAN_MAXVERSIONS))); &&& &&& &&& &&& } &&& &&& &&& &&& if (conf.get(SCAN_CACHEDROWS) != null) { &&& &&& &&& &&& &&& scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS))); &&& &&& &&& &&& } &&& &&& &&& &&& scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false))); &&& &&& &&& } catch (Exception e) { &&& &&& &&& &&& LOG.error(StringUtils.stringifyException(e)); &&& &&& &&& } &&& &&& } &&& &&& setScan(scan); &&& } &&& @Override &&& public String formatRowResult(Result row) { &&& &&& StringBuilder builder = new StringBuilder(); &&& &&& for (KeyValue kv : row.list()) { &&& &&& &&& builder.append(new String(kv.getRow())).append(&& &) &&& &&& &&& &&& &&& .append(new String(kv.getFamily())).append(&:&) &&& &&& &&& &&& &&& .append(new String(kv.getQualifier())).append(&& &) &&& &&& &&& &&& &&& .append(kv.getTimestamp()).append(&& &) &&& &&& &&& &&& &&& .append(new String(kv.getValue())); &&& &&& } &&& &&& return builder.toString(); &&& } }
然后通过这个简单的wordcount&&& mapreduce即可执行。
package com.cp.hbase. import java.io.ByteArrayOutputS import java.io.DataOutputS import java.io.IOE import java.util.StringT import org.apache.hadoop.conf.C import org.apache.hadoop.fs.P import org.apache.hadoop.hbase.HBaseC import org.apache.hadoop.hbase.client.S import org.apache.hadoop.hbase.mapreduce.TableInputF import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.B import org.apache.hadoop.io.IntW import org.apache.hadoop.io.T import org.apache.hadoop.mapreduce.J import org.apache.hadoop.mapreduce.M import org.apache.hadoop.mapreduce.R import org.apache.hadoop.mapreduce.lib.output.FileOutputF import org.apache.hadoop.util.GenericOptionsP import org.cp.hadoop.hbase.mapreduce.StringTableInputF public class WordCount { &&& public static class TokenizerMapper extends &&& &&& &&& Mapper&Object, Text, Text, IntWritable& { &&& &&& private final static IntWritable one = new IntWritable(1); &&& &&& private Text word = new Text(); &&& &&& public void map(Object key, Text value, Context context) &&& &&& &&& &&& throws IOException, InterruptedException { &&& &&& &&& StringTokenizer itr = new StringTokenizer(value.toString()); &&& &&& &&& while (itr.hasMoreTokens()) { &&& &&& &&& &&& word.set(itr.nextToken()); &&& &&& &&& &&& context.write(word, one); &&& &&& &&& } &&& &&& } &&& } &&& public static class IntSumReducer extends &&& &&& &&& Reducer&Text, IntWritable, Text, IntWritable& { &&& &&& private IntWritable result = new IntWritable(); &&& &&& public void reduce(Text key, Iterable&IntWritable& values, &&& &&& &&& &&& Context context) throws IOException, InterruptedException { &&& &&& &&& int sum = 0; &&& &&& &&& for (IntWritable val : values) { &&& &&& &&& &&& sum += val.get(); &&& &&& &&& } &&& &&& &&& result.set(sum); &&& &&& &&& context.write(key, result); &&& &&& } &&& } &&& static String convertScanToString(Scan scan) throws IOException { &&& &&& ByteArrayOutputStream out = new ByteArrayOutputStream(); &&& &&& DataOutputStream dos = new DataOutputStream(out); &&& &&& scan.write(dos); &&& &&& return Base64.encodeBytes(out.toByteArray()); &&& } &&& public static void main(String[] args) throws Exception { &&& &&& Configuration conf = new Configuration(); &&& &&& String[] otherArgs = new GenericOptionsParser(conf, args) &&& &&& &&& &&& .getRemainingArgs(); &&& &&&
&&& &&& Scan scan = new Scan(); &&& &&& scan.addColumn(Bytes.toBytes(&author&),Bytes.toBytes(&nickname&)); &&& &&& scan.addColumn(Bytes.toBytes(&article&),Bytes.toBytes(&tags&)); &&& &&&
&&& &&& Job job = new Job(conf, &word count&); &&& &&& job.setJarByClass(WordCount.class); &&& &&& job.setMapperClass(TokenizerMapper.class); &&& &&& job.setCombinerClass(IntSumReducer.class); &&& &&& job.setReducerClass(IntSumReducer.class); &&& &&& job.setOutputKeyClass(Text.class); &&& &&& job.setOutputValueClass(IntWritable.class); &&& &&& job.setInputFormatClass(StringTableInputFormat.class); &&& &&& HBaseConfiguration.addHbaseResources(job.getConfiguration()); &&& &&& job.getConfiguration().set(TableInputFormat.INPUT_TABLE, &blog&); &&& &&& job.getConfiguration().set(TableInputFormat.SCAN, convertScanToString(scan)); &&& &&& // FileInputFormat.addInputPath(job, new Path(otherArgs[0])); &&& &&& FileOutputFormat.setOutputPath(job, new Path(&/home/cp/output&)); &&& &&& System.exit(job.waitForCompletion(true) ? 0 : 1); &&& } }
版权所有 IT知识库 CopyRight (C)
IT知识库 IT610.com , All Rights Reserved.}

我要回帖

更多关于 C编译用怎么选文件 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信