hbase数据库中的数据数据库导出文件格式

Sqoop安装、导入导出HDFS/HBase操作详解 - 简书
Sqoop安装、导入导出HDFS/HBase操作详解
一、安装Sqoop
Sqoop是一款基于Hadoop系统的数据转移工具,因此在安装Sqoop之前需要先安装好Hadoop。
本文使用的各软件版本如下:
操作系统:ubuntu-14.04.1-desktop-amd64
Hadoop版本:hadoop-2.7.1
MySQL版本:5.5.54
MySQL驱动:mysql-connector-java-5.1.20-bin.jar
Sqoop版本:sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz
1. 下载Sqoop
从Apache官网下载Sqoop的最新稳定版本,下载网址为:
本文例子使用的是sqoop-1.4.6版本。
2. 安装Sqoop
1)将下载的Sqoop安装文件上传到Ubuntu系统合适目录(例如usr/local),然后进行解压。解压命令如下:
tar -zxvf sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz
2)将解压后的文件夹sqoop-1.4.6.bin__hadoop-2.0.4-alpha进行重命名为sqoop-1.4.6,代码如下:
mv sqoop-1.4.6.bin__hadoop-2.0.4-alpha sqoop-1.4.6
3)为了以后的操作方便,对Sqoop的环境变量进行配置,在/etc/profile文件中加入以下内容:
export SQOOP_HOME=/usr/local/sqoop-1.4.6
export PATH=$SQOOP_HOME/bin:$PATH
加入后执行source /etc/profile命令对环境变量文件进行刷新。
4)拷贝${SQOOP_HOME}/conf/sqoop-env-template.sh 到${SQOOP_HOME}/conf/sqoop-env.sh,然后修改sqoop-env.sh,加入HADOOP_COMMON_HOME和HADOOP_MAPRED_HOME,指定Hadoop安装目录,如下:
export HADOOP_COMMON_HOME=/usr/local/hadoop-2.7.1
export HADOOP_MAPRED_HOME=/usr/local/hadoop-2.7.1
5)测试是否安装成功,执行列出mysql中的数据库命令:
sqoop list-databases --connect jdbc:mysql://192.168.1.69:3306/test --username root --password 123456
--connect参数为MySQL数据库的连接地址。
--username参数为MySQL数据库的用户名。
--password参数为MySQL数据库的密码。
本例执行的操作结果如下:
输出结果为:
information_schema
performance_schema
如果能成功列出MySQL中的数据库列表,说明安装成功。
二、使用Sqoop将MySQL中的表数据导入到HDFS
将MySQL 中数据库test中的表user_info导入到HDFS中,表user_info中有两条数据,截图如下:
表user_info
启动Hadoop
start-all.sh
执行导入命令
sqoop import \
--connect jdbc:mysql://192.168.1.69/test?characterEncoding=UTF-8 \
--username root --password 123456 \
--table user_info \
--columns userId,userName,password,trueName,addedTime \
--target-dir /sqoop/mysql
从命令中可以看出,导入的HDFS目录为/sqoop/mysql。如果HDFS不存在此目录,则会新建该目录。
执行过程如下图:
图中出现的警告信息我们先忽略不计。从图中可以看出,导入过程只使用了map任务,没有使用reduce。其中输出了一句SQL语句:SELECT MIN(userId), MAX(userId) FROM user_info,至于为什么会这样输出,将在后续讲解。
查看导入结果
执行下面命令,查看/sqoop/mysql目录下生成的文件:
hadoop fs -ls /sqoop/mysql
显示结果如下:
root@master:~# hadoop fs -ls /sqoop/mysql
Found 3 items
-rw-r--r--
2 root supergroup
17:23 /sqoop/mysql/_SUCCESS
-rw-r--r--
2 root supergroup
17:23 /sqoop/mysql/part-m-00000
-rw-r--r--
2 root supergroup
17:23 /sqoop/mysql/part-m-00001
说明在HDFS文件系统中生成了三个文件_SUCCESS、part-m-00000、part-m-00001,而我们的数据正是存在后两个文件中。分别查看生成的后两个文件中的内容,可以看到MySQL中的每一行数据生成了一个文件:
查看每一个文件中的数据
执行下面命令,查看/sqoop/mysql/文件夹下所有文件的内容:
hadoop fs -cat /sqoop/mysql/*
显示结果如下:
三、使用Sqoop将HDFS中的数据导出到MySQL表
在MySQL数据库test中新建表user_info_2,字段如下:
表user_info_2
将HDFS文件系统/sqoop/mysql/part-m-00000文件中的内容导出到表user_info_2中,导出命令如下:
sqoop export \
--connect jdbc:mysql://192.168.1.69:3306/test?characterEncoding=UTF-8 \
--username root \
--password 123456 \
--table user_info_2 \
--export-dir /sqoop/mysql/part-m-00000
导出成功后,刷新表user_info_2,查看其中的数据:
表user_info_2
可以看到,多了一条数据。当然,也可以将HDFS中/sqoop/mysql目录下的所有文件一起导出到表user_info_2中,只需将上述导出命令中的/sqoop/mysql/part-m-00000替换为/sqoop/mysql/*即可。
四、使用Sqoop将Mysql表数据导入到HBase
Mysql中的表user_info有两条数据,如下:
表user_info
1.启动hbase
start-hbase.sh
2.在hbase中新建表user_info,列族baseinfo
create 'user_info','baseinfo'
3.执行导入命令:
sqoop import \
--connect jdbc:mysql://192.168.1.69/test?characterEncoding=UTF-8 \
--username root --password 123456 \
--query "SELECT * FROM user_info WHERE 1=1 AND \$CONDITIONS" \
--hbase-table user_info \
--column-family baseinfo \
--hbase-row-key userId \
--split-by addedTime \
反斜杠后面紧跟回车,表示下一行是当前行的续行。
参数说明:
--hbase-table 指定要导入的HBase中的表名。
--column-family 指定要导入的HBase表的列族。
--hbase-row-key 指定mysql中的某一列作为HBase表中的rowkey。
--split-by 指定mysql中的某一列作为分区导入。默认是主键。
--m 指定复制过程使用的map作业的数量。
sqoop是如何根据--split-by进行分区的?
假设有一张表test,sqoop命令中--split-by id --m 10。首先,sqoop会去查表的元数据,sqoop会向关系型数据库比如mysql发送一个命令:select max(id),min(id) from test。然后会把max、min之间的区间平均分为10分,最后10个并行的map去找数据库,导数据就正式开始了。
查看导入结果
进入hbase shell,扫描user_info表中的数据,命令及执行效果如下:
hbase(main):001:0& scan 'user_info'
COLUMN+CELL
column=baseinfo:addedTime, timestamp=0, value=
column=baseinfo:password, timestamp=0, value=123456
column=baseinfo:trueName, timestamp=0, value=zhangsan
column=baseinfo:userName, timestamp=0, value=hello
column=baseinfo:addedTime, timestamp=0, value=
column=baseinfo:password, timestamp=0, value=123456
column=baseinfo:trueName, timestamp=0, value=lisi
column=baseinfo:userName, timestamp=0, value=hello2
2 row(s) in 0.6870 seconds
可以看到,两条数据已经成功导入。
如果修改user_info中的数据,重新导入,则会更新替换hbase中相同rowkey对应行的数据。
注意的问题:如果mysql表中存在中文,导入到hbase表中的结果如下:
表user_info
导入hbase后的结果:
中文变成了十六进制字符,我们把其中一个trueName值\xE5\xBC\xA0\xE4\xB8\x89转成中文,结果如下:
说明中文确实是转成了十六进制,我们从HBase中取数据时,将十六进制转成中文即可。
上方操作是提前在HBase中创建了相应的表,当然也可以提前不创建hbase表,在导入命令中指定创建表参数,如果表不存在则会创建表,如下:
sqoop import \
--connect jdbc:mysql://192.168.1.69/test \
--username root --password 123456 \
--query "SELECT * FROM user_info WHERE 1=1 AND \$CONDITIONS" \
--hbase-table user_info \
--column-family baseinfo \
--hbase-create-table \
--hbase-row-key userId \
--split-by addedTime \
但是这种方式经过测试,出现bug,创建表不成功,原因是hbase中缺少Sqoop需要的API类。说明当前Sqoop版本与HBase版本不兼容。HBase我使用的是1.2.4版本。
如果大家有其它好的意见,欢迎在下方留言。
作者简介:技术经理,一线高级软件工程师,美国IPMA高级人力资源管理师
1/列出mysql数据库中的所有数据库sqoop list-databases -connect jdbc:mysql://localhost:3306/ -username root -password 1234562/连接mysql并 列出test数据库中的表sqoop...
Apache Sqoop 概述 使用Hadoop来分析和处理数据需要将数据加载到集群中并且将它和企业生产数据库中的其他数据进行结合处理。从生产系统加载大块数据到Hadoop中或者从大型集群的map reduce应用中获得数据是个挑战。用户必须意识到确保数据一致性,消耗生产系...
* HBase框架基础(四) 上一节我们介绍了如何使用HBase搞一些MapReduce小程序,其主要作用呢是可以做一些数据清洗和分析或者导入数据的工作,这一节我们来介绍如何使用HBase与其他框架进行搭配使用。 * HBase与Hive 在开始HBase与Hive搭配使用...
有时候需要将mysql的全量数据导入到hive或者hbase中,使用sqoop是一个比较好用的工具,速度相对来说比较快。mysql的增量数据在用其他方法实时同步。 一、mysql同步到hbase 导入命令:sqoop import --connect jdbc:mysql:...
原文地址 http://www.cnblogs.com/wgp13x/p/5028220.html基础环境sqoop:sqoop-1.4.5+cdh5.3.6+78,hive:hive-0.13.1+cdh5.3.6+397,hbase:hbase-0.98.6+cdh5....
黄叶漫飘飞, 独有一枝红, 近看是何故, 却是醉秋风。
买了Mac,终于没有借口不写文章了。
不同的阅读层次决定了不同的阅读效果,有的是消遣,有的是得到知识,有的则是构建自己的思考并获取成长。不同的阅读层次也来源于不同的阅读思维和阅读技巧。下面我们结合《如何阅读一本书》第一篇——阅读的层次,来说一下如何奠定阅读基础并进行阅读训练。 1、从思维方式上说,学会主动阅读,...
有个相当易眠的男子睡在房子里,而房子起火了,他们试着将他从窗户抬出去,行不痛,又试着将他从门抬出去,也行不通,他的身躯太庞大、太笨重了,他们很绝望,直到有人提议:「把他摇醒,他会自己走出去。」 「人类是无意识的。」合一大学校长巴关说道:「每个人内在都有着巨大的痛苦。就像被老...如何将Sqlite数据库里面表的数据导出成一个TXT文件
[问题点数:75分,结帖人u]
本版专家分:0
结帖率 83.33%
CSDN今日推荐
本版专家分:0
结帖率 83.33%
本版专家分:0
结帖率 83.33%
本版专家分:0
结帖率 83.33%
匿名用户不能发表回复!|
CSDN今日推荐数据导出hdfs相关的博客
摘要: 大数据计算服务(MaxCompute)是一种快速、完全托管的 GB/TB/PB 级数据仓库解决方案,目前已在阿里巴巴内部得到大规模应用。来自阿里妈妈基础平台大规模数据处理技术专家向大家分享了MaxCompute在阿里妈妈数据字化营销解决方案上的典型应
摘要:第九届中国数据库技术大会,阿里巴巴技术专家孟庆义对阿里HBase的数据管道设施实践与演进进行了讲解。主要从数据导入场景、 HBase Bulkload功能、HImporter系统、数据导出场景、HExporter系统这些部分进行了讲述。数十款阿里云产品
红色字体是现阶段比较火的 ---------------------------------------------------------------------------------------------------------------- 奇虎36
Maven初体验 因为三年的学习中,随着自己学习Java的深入,也越来越理不清自己的应用的依赖关系。比如起初 因为一个StringUtil而导入的Apache的commens扩展包。 还有玩数据库时导入的MySQL驱动包随后还有oracle的驱动包 然后玩Ja
数据导出hdfs相关问答
Sqoop从mysql导出数据到HDFS,也想把表的字段作为表头也添加进去
数据导出hdfs相关问题集
hbase导出表数据到hdfs
我需要把hbase中的表数据导入到hdfs
使用的命令 hbase org.apache.hadoop.hba
利用sqoop把数据从Oracle导出到hive报错
bash-4.1$ sqoop import --connect
jdbc:oracle:
HBase中的数据export到HDFS上
我现在把HDFS上的数据import到HBase中去了,现在想要知道,如何将HBas
如何将存储量很大的txt文档数据导入到hbase当中
我现在要写一个程序,将第三方导出的txt文件内容读取并放到hbase当中。
由于txt文档非常
数据导出hdfs相关基础文章
相当长一段时间以来,大数据社区已经普遍认识到了批量数据处理的不足。很多应用都对实时查询和流式处理产生了迫切需求。最近几年,在这个理念的推动下,催生出了一系列解决方案,Twitter Storm,Yahoo S4,Cloudera Impala,Apache..
Spring XD(eXtreme Data,极限数据)是Pivotal的大数据产品。它结合了Spring Boot和Grails,组成Spring IO平台的执行部分。尽管Spring XD利用了大量现存的Spring项目,但它是一种运行时环境,而不是
Hadoop是一个开发和运行处理大规模数据的软件平台,是Apache的一个用Java语言实现开源软件框架,实现在大量计算机组成的集群中对海量数据进行分布式计算。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利...
HDFS的数据块
磁盘数据块是磁盘进行数据读/写的最小单位,一般是512字节,
而HDFS中也有数据块,默认为64MB。所以HDFS上的大文件被分为许多个chunk.而HDFS上的小文件(小于64MB)的文件不会占据整个块的空间。
HDFS数据块设置大的原..
数据导出hdfs相关资料
# yum install mysql mysql-devel mysql-server mysql-libs -y
启动数据库:
# 配置开启启动
# chkconfig mysqld on
# service mysqld sta
...-zzk-113:9000
配置项:hadoop.tmp.dir表示命名节点上存放元数据的目录位置,对于数据节点则为该节点上存放文件数据的目录。
配置项:fs.default.name表示命名的IP地址和端口号,缺省值是file:///,对于J
...se的管理服务
hbase-regionServer - 对Client端插入,删除,查询数据等提供服务
zookeeper-server - Zookeeper协作与配置管理服务
本文定义的规范,避免在配置多台服务器上产生理解上的混乱:
所有直接以 $
...ode,其中namenode是命名空间,datanode是存储空间,datanode以数据块的形式进行存储,每个数据块128M
2)yarn:通用资源管理系统,为上层应用提供统一的资源管理和调度。
yarn分为resourcemanager和nodema
为您提供简单高效、处理能力可弹性伸缩的计算服务,帮助您快速构建更稳定、安全的应用,提升运维效率,降低...
支持以数据库为核心的结构化存储产品之间的数据传输。 它是一种集数据迁移、数据订阅及数据实时同步于一体...
阿里云推出的一款移动App数据统计分析产品,为开发者提供一站式数据化运营服务
大数据开发套件(Data IDE),提供可视化开发界面、离线任务调度运维、快速数据集成、多人协同工作...HBase 5种写入数据方式_SQL技巧_动态网站制作指南
HBase 5种写入数据方式
来源:人气:635
问题导读:
1.如何直接使用HTable进行导入?
2.如何从HDFS文件导入HBase,继承自Mapper?
3.如何读取HBase表写入HBase表中字段?
4.如何让MR和HTable结合?
Version :hadoop1.2.1; hbaes0.94.16;
HBase写入数据方式(参考:《HBase The Definitive Guide》),可以简单分为下面几种:
1. 直接使用HTable进行导入,代码如下:
package hbase.
import .io.IOE
import java.util.ArrayL
import java.util.L
import java.util.R
import org.apache.hadoop.hbase.client.HT
import org.apache.hadoop.hbase.client.P
import org.apache.hadoop.hbase.util.B
public class PutExample {
& && &&&/**
& && && &* @param args
& && && &* @throws IOException&
& && && &*/
& && &&&ivate HTable&&table = HTableUtil.getHTable(&testtable&);
& && &&&public static void main(String[] args) throws IOException {
& && && && && & // TODO Auto-generated method stub
& && && && && & PutExample pe = new PutExample();
& && && && && & pe.putRows();
& && && && && &&
& && &&&public void putRows(){
& && && && && & List&Put& puts = new ArrayList&Put&();
& && && && && & for(int i=0;i&10;i++){
& && && && && && && && &Put put = new Put(Bytes.toBytes(&row_&+i));
& && && && && && && && &Random random = new Random();
& && && && && && && && &
& && && && && && && && &if(random.nextBoolean()){
& && && && && && && && && && &&&put.add(Bytes.toBytes(&colfam1&), Bytes.toBytes(&qual1&), Bytes.toBytes(&colfam1_qual1_value_&+i));
& && && && && && && && &}
& && && && && && && && &if(random.nextBoolean()){
& && && && && && && && && && &&&put.add(Bytes.toBytes(&colfam1&), Bytes.toBytes(&qual2&), Bytes.toBytes(&colfam1_qual1_value_&+i));
& && && && && && && && &}
& && && && && && && && &if(random.nextBoolean()){
& && && && && && && && && && &&&put.add(Bytes.toBytes(&colfam1&), Bytes.toBytes(&qual3&), Bytes.toBytes(&colfam1_qual1_value_&+i));
& && && && && && && && &}
& && && && && && && && &if(random.nextBoolean()){
& && && && && && && && && && &&&put.add(Bytes.toBytes(&colfam1&), Bytes.toBytes(&qual4&), Bytes.toBytes(&colfam1_qual1_value_&+i));& && &&&
& && && && && && && && &}
& && && && && && && && &if(random.nextBoolean()){
& && && && && && && && && && &&&put.add(Bytes.toBytes(&colfam1&), Bytes.toBytes(&qual5&), Bytes.toBytes(&colfam1_qual1_value_&+i));
& && && && && && && && &}
& && && && && && && && &puts.add(put);
& && && && && & }
& && && && && & try{
& && && && && && && && &table.put(puts);
& && && && && && && && &table.close();
& && && && && & }catch(Exception e){
& && && && && && && && &e.printStackTrace();
& && && && && && && && &
& && && && && & }
& && && && && & System.out.println(&done put rows&);
其中HTableUtil如下:
package hbase.
import java.io.IOE
import org.apache.hadoop.conf.C
import org.apache.hadoop.hbase.HBaseC
import org.apache.hadoop.hbase.client.HT
import org.apache.hadoop.hbase.util.B
public class HTableUtil {
& && &&&private static HT
& && &&&private static C
& && &&&static{
& && && && && & conf =HBaseConfiguration.create();
& && && && && & conf.set(&mapred.job.tracker&, &hbase:9001&);
& && && && && & conf.set(&fs.default.name&, &hbase:9000&);
& && && && && & conf.set(&hbase.zookeeper.quorum&, &hbase&);
& && && && && & try {
& && && && && && && && &table = new HTable(conf,&testtable&);
& && && && && & } catch (IOException e) {
& && && && && && && && &// TODO Auto-generated catch block
& && && && && && && && &e.printStackTrace();
& && && && && & }
& && &&&public static Configuration getConf(){
& && && && && &
& && &&&public static HTable getHTable(String tablename){
& && && && && & if(table==null){
& && && && && && && && &try {
& && && && && && && && && && &&&table= new HTable(conf,tablename);
& && && && && && && && &} catch (IOException e) {
& && && && && && && && && && &&&// TODO Auto-generated catch block
& && && && && && && && && && &&&e.printStackTrace();
& && && && && && && && &}&
& && && && && & }
& && && && && &
& && &&&public static&&byte[] gB(String name){
& && && && && & return Bytes.toBytes(name);
这一种是没有使用MR的,下面介绍的几种方式都是使用MR的。
2.1 从HDFS文件导入HBase,继承自Mapper,代码如下:
package hbase.
import java.io.IOE
import hbase.curd.HTableU
import org.apache.commons.cli.CommandL
import org.apache.commons.cli.CommandLineP
import org.apache.commons.cli.HelpF
import org.apache.commons.cli.O
import org.apache.commons.cli.O
import org.apache.commons.cli.PosixP
import org.apache.commons.codec.digest.DigestU
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.P
import org.apache.hadoop.hbase.KeyV
import org.apache.hadoop.hbase.client.P
import org.apache.hadoop.hbase.io.ImmutableBytesW
import org.apache.hadoop.hbase.mapreduce.TableOutputF
import org.apache.hadoop.hbase.util.B
import org.apache.hadoop.io.LongW
import org.apache.hadoop.io.T
import org.apache.hadoop.io.W
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.M
import org.apache.hadoop.mapreduce.lib.input.FileInputF
import org.apache.hadoop.util.GenericOptionsP
public class ImportFromFile {
& && &&&/**
& && && &* 从文件导入到HBase
& && && &* @param args
& && && &*/
& && &&&public static final String NAME=&ImportFromFile&;
& && &&&public enum Counters{LINES}
& && &&&static class ImportMapper extends Mapper&LongWritable,Text,
& && && && && & ImmutableBytesWritable,Writable&{
& && && && && & private byte[] family =
& && && && && & private byte[] qualifier =
& && && && && & @Override
& && && && && & protected void setup(Context cxt){
& && && && && && && && &String column = cxt.getConfiguration().get(&conf.column&);
& && && && && && && && &byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(column));
& && && && && && && && &family = colkey[0];
& && && && && && && && &if(colkey.length&1){
& && && && && && && && && && &&&qualifier = colkey[1];
& && && && && && && && &}
& && && && && & }
& && && && && & @Override
& && && && && & public void map(LongWritable offset,Text line,Context cxt){
& && && && && && && && &try{
& && && && && && && && && && &&&String lineString= line.toString();
& && && && && && && && && && &&&byte[] rowkey= DigestUtils.(lineString);
& && && && && && && && && && &&&Put put = new Put(rowkey);
& && && && && && && && && && &&&put.add(family,qualifier,Bytes.toBytes(lineString));
& && && && && && && && && && &&&cxt.write(new ImmutableBytesWritable(rowkey), put);
& && && && && && && && && && &&&cxt.getCounter(Counters.LINES).increment(1);
& && && && && && && && &}catch(Exception e){
& && && && && && && && && && &&&e.printStackTrace();
& && && && && && && && &}
& && && && && & }
& && &&&private static CommandLine parseArgs(String[] args){
& && && && && & Options options = new Options();
& && && && && & Option o = new Option(&t& ,&table&,true,&table to import into (must exist)&);
& && && && && & o.setArgName(&table-name&);
& && && && && & o.setRequired(true);
& && && && && & options.addOption(o);
& && && && && &&
& && && && && & o= new Option(&c&,&column&,true,&column to store row data into&);
& && && && && & o.setArgName(&family:qualifier&);
& && && && && & o.setRequired(true);
& && && && && & options.addOption(o);
& && && && && &&
& && && && && & o = new Option(&i&, &input&, true,
& && && && && & &the directory or file to read from&);
& && && && && & o.setArgName(&path-in-HDFS&);
& && && && && & o.setRequired(true);
& && && && && & options.addOption(o);
& && && && && & options.addOption(&d&, &debug&, false, &switch on DEBUG log level&);
& && && && && & CommandLineParser parser = new PosixParser();
& && && && && & CommandLine cmd =
& && && && && & try {
& && && && && && && && &cmd = parser.parse(options, args);
& && && && && & } catch (Exception e) {
& && && && && && && && &System.err.println(&ERROR: & + e.getMessage() + &\n&);
& && && && && && && && &HelpFormatter formatter = new HelpFormatter();
& && && && && && && && &formatter.printHelp(NAME + & &, options, true);
& && && && && && && && &System.exit(-1);
& && && && && & }
& && && && && &
& && &&&public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
& && && && && &&
& && && && && & Configuration conf = HTableUtil.getConf();
& && && && && & String[] otherArgs = new GenericOptionsParser(conf, initialArg()).getRemainingArgs();&
& && && && && & CommandLine cmd = parseArgs(otherArgs);
& && && && && & String table = cmd.getOptionValue(&t&);
& && && && && & String input = cmd.getOptionValue(&i&);
& && && && && & String column = cmd.getOptionValue(&c&);
& && && && && & conf.set(&conf.column&, column);
& && && && && & Job job = new Job(conf, &Import from file & + input + & into table & + table);
& && && && && & job.setJarByClass(ImportFromFile.class);
& && && && && & job.setMapperClass(ImportMapper.class);
& && && && && & job.setOutputFormatClass(TableOutputFormat.class);
& && && && && & job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
& && && && && & job.setOutputKeyClass(ImmutableBytesWritable.class);
& && && && && & job.setOutputValueClass(Writable.class);
& && && && && & job.setNumReduceTasks(0);&
& && && && && & FileInputFormat.addInputPath(job, new Path(input));
& && && && && & System.exit(job.waitForCompletion(true) ? 0 : 1);
& && &&&private static String[] initialArg(){
& && && && && & String []args = new String[6];
& && && && && & args[0]=&-c&;
& && && && && & args[1]=&fam:data&;
& && && && && & args[2]=&-i&;
& && && && && & args[3]=&/user/hadoop/input/picdata&;
& && && && && & args[4]=&-t&;
& && && && && & args[5]=&testtable&;
& && && && && &
2.2 读取HBase表写入HBase表中字段,代码如下:
package hbase.
import hadoop.util.HadoopU
import java.io.IOE
import org.apache.hadoop.conf.C
import org.apache.hadoop.hbase.KeyV
import org.apache.hadoop.hbase.client.P
import org.apache.hadoop.hbase.client.R
import org.apache.hadoop.hbase.client.S
import org.apache.hadoop.hbase.io.ImmutableBytesW
import org.apache.hadoop.hbase.mapreduce.IdentityTableR
import org.apache.hadoop.hbase.mapreduce.TableMapReduceU
import org.apache.hadoop.hbase.mapreduce.TableM
import org.apache.hadoop.hbase.util.B
import org.apache.hadoop.io.W
import org.apache.hadoop.mapreduce.J
import org.slf4j.L
import org.slf4j.LoggerF
public class ParseDriver {
& && &&&/**
& && && &* 把hbase表中数据拷贝到其他表(或本表)相同字段
& && && &* @param args
& && && &*/
& && &&&enum Counters{
& && && && && & VALID, ROWS, COLS, ERROR
& && &&&private static Logger log = LoggerFactory.getLogger(ParseDriver.class);
& && &&&static class ParseMapper extends TableMapper&ImmutableBytesWritable,Writable&{
& && && && && & private byte[] columnFamily =
& && && && && & private byte[] columnQualifier =
& && && && && & @Override
& && && && && & protected void setup(Context cxt){
& && && && && && && && &columnFamily = Bytes.toBytes(cxt.getConfiguration().get(&conf.columnfamily&));
& && && && && && && && &columnQualifier = Bytes.toBytes(cxt.getConfiguration().get(&conf.columnqualifier&));
& && && && && & }
& && && && && & @Override&
& && && && && & public void map(ImmutableBytesWritable row,Result columns,Context cxt){
& && && && && && && && &cxt.getCounter(Counters.ROWS).increment(1);
& && && && && && && && &String value =
& && && && && && && && &try{
& && && && && && && && && && &&&Put put = new Put(row.get());
& && && && && && && && && && &&&for(KeyValue kv : columns.list()){
& && && && && && && && && && && && && & cxt.getCounter(Counters.COLS).increment(1);
& && && && && && && && && && && && && & value= Bytes.toStringBinary(kv.getValue());
& && && && && && && && && && && && && & if(equals(columnQualifier,kv.getQualifier())){&&// 过滤column
& && && && && && && && && && && && && && && && &put.add(columnFamily,columnQualifier,kv.getValue());
& && && && && && && && && && && && && && && && &cxt.write(row, put);
& && && && && && && && && && && && && && && && &cxt.getCounter(Counters.VALID).increment(1);
& && && && && && && && && && && && && & }
& && && && && && && && && && &&&}
& && && && && && && && &}catch(Exception e){
& && && && && && && && && && &&&log.info(&Error:&+e.getMessage()+&,Row:&+Bytes.toStringBinary(row.get())+
& && && && && && && && && && && && && && && && &&,Value:&+value);
& && && && && && && && && && &&&cxt.getCounter(Counters.ERROR).increment(1);
& && && && && && && && &}
& && && && && & }
& && && && && & private boolean equals(byte[] a,byte[] b){
& && && && && && && && &String aStr= Bytes.toString(a);
& && && && && && && && &String bStr= Bytes.toString(b);
& && && && && && && && &if(aStr.equals(bStr)){
& && && && && && && && && && &&&
& && && && && && && && &}
& && && && && && && && &
& && && && && & }
& && &&&public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
& && && && && & byte[] columnFamily = Bytes.toBytes(&fam&);
& && && && && & byte[] columnQualifier = Bytes.toBytes(&data&);
& && && && && & Scan scan = new Scan ();
& && && && && & scan.addColumn(columnFamily, columnQualifier);
& && && && && & HadoopUtils.initialConf(&hbase&);
& && && && && & Configuration conf = HadoopUtils.getConf();
& && && && && & conf.set(&conf.columnfamily&, Bytes.toStringBinary(columnFamily));
& && && && && & conf.set(&conf.columnqualifier&, Bytes.toStringBinary(columnQualifier));
& && && && && &&
& && && && && & String input =&testtable& ;//
& && && && && & String output=&testtable1&; //&
& && && && && && && && && && &&&
& && && && && & Job job = new Job(conf,&Parse data in &+input+&,write to&+output);
& && && && && & job.setJarByClass(ParseDriver.class);
& && && && && & TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,&
& && && && && && && && && && &&&ImmutableBytesWritable.class, Put.class,job);
& && && && && & TableMapReduceUtil.initTableReducerJob(output, IdentityTableReducer.class, job);
& && && && && &&
& && && && && & System.exit(job.waitForCompletion(true)?0:1);
& && && && && &&
其中HadoopUtils代码如下:
package hadoop.
import java.io.IOE
import java.net.URI;
import java.util.ArrayL
import java.util.L
import org.apache.hadoop.conf.C
import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileS
import org.apache.hadoop.fs.P
import org.apache.hadoop.io.T
import org.apache.hadoop.util.LineR
public class HadoopUtils {
& && &&&private static C
& && &&&public&&static void initialConf(){
& && && && && & conf = new Configuration();
& && && && && & conf.set(&mapred.job.tracker&, &hbase:9001&);
& && && && && & conf.set(&fs.default.name&, &hbase:9000&);
& && && && && & conf.set(&hbase.zookeeper.quorum&, &hbase&);
& && &&&public&&static void initialConf(String host){
& && && && && & conf = new Configuration();
& && && && && & conf.set(&mapred.job.tracker&, host+&:9001&);
& && && && && & conf.set(&fs.default.name&, host+&:9000&);
& && && && && & conf.set(&hbase.zookeeper.quorum&, host);
& && &&&public static Configuration getConf(){
& && && && && & if(conf==null){
& && && && && && && && &initialConf();
& && && && && & }
& && && && && &
& && &&&public static List&String& readFromHDFS(String fileName) throws IOException {
& && && && && & Configuration conf = getConf();
& && && && && & FileSystem fs = FileSystem.get(URI.create(fileName), conf);
& && && && && & FSDataInputStream hdfsInStream = fs.open(new Path(fileName));
& && && && && & // 按行读取(新版本的方法)
& && && && && & LineReader inLine = new LineReader(hdfsInStream, conf);
& && && && && & Text txtLine = new Text();
& && && && && &&
& && && && && & int iResult = inLine.readLine(txtLine); //读取第一行
& && && && && & List&String& list = new ArrayList&String&();
& && && && && & while (iResult & 0 ) {
& && && && && && && && &list.add(txtLine.toString());
& && && && && && && && &iResult = inLine.readLine(txtLine);
& && && && && & }
& && && && && &&
& && && && && & hdfsInStream.close();
& && && && && & fs.close();
& && && && && &
2.3 MR和HTable结合,代码如下:
package hbase.
import hadoop.util.HadoopU
import hbase.mr.AnalyzeDriver.C
import java.io.IOE
import java.util.D
import org.apache.hadoop.conf.C
import org.apache.hadoop.hbase.KeyV
import org.apache.hadoop.hbase.client.HT
import org.apache.hadoop.hbase.client.P
import org.apache.hadoop.hbase.client.R
import org.apache.hadoop.hbase.client.S
import org.apache.hadoop.hbase.io.ImmutableBytesW
import org.apache.hadoop.hbase.mapreduce.TableMapReduceU
import org.apache.hadoop.hbase.mapreduce.TableM
import org.apache.hadoop.hbase.util.B
import org.apache.hadoop.io.W
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.lib.output.NullOutputF
import org.slf4j.L
import org.slf4j.LoggerF
public class ParseSinglePutDriver {
& && &&&/**
& && && &* 使用HTable进行写入
& && && &* 把infoTable 表中的 qualifier字段复制到qualifier1字段
& && && &* 单个Put
& && && &* @param args
& && && &*/
& && &&&private static Logger log = LoggerFactory.getLogger(ParseMapper.class);
& && &&&static class ParseMapper extends TableMapper&ImmutableBytesWritable,Writable&{
& && && && && & private HTable infoTable =
& && && && && & private byte[] columnFamily =
& && && && && & private byte[] columnQualifier =
& && && && && & private byte[] columnQualifier1 =
& && && && && & @Override&
& && && && && & protected void setup(Context cxt){
& && && && && && && && &log.info(&ParseSinglePutDriver setup,current time: &+new Date());
& && && && && && && && &try {
& && && && && && && && && && &&&infoTable = new HTable(cxt.getConfiguration(),
& && && && && && && && && && && && && && && && &cxt.getConfiguration().get(&conf.infotable&));
& && && && && && && && && && &&&infoTable.setAutoFlush(false);
& && && && && && && && &} catch (IOException e) {
& && && && && && && && && && &&&log.error(&Initial infoTable error:\n&+e.getMessage());
& && && && && && && && &}
& && && && && && && && &columnFamily = Bytes.toBytes(cxt.getConfiguration().get(&conf.columnfamily&));
& && && && && && && && &columnQualifier = Bytes.toBytes(cxt.getConfiguration().get(&conf.columnqualifier&));
& && && && && && && && &columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get(&conf.columnqualifier1&));
& && && && && & }
& && && && && & @Override&
& && && && && & protected void cleanup(Context cxt){
& && && && && && && && &try {
& && && && && && && && && && &&&infoTable.flushCommits();
& && && && && && && && && && &&&log.info(&ParseSinglePutDriver cleanup ,current time :&+new Date());
& && && && && && && && &} catch (IOException e) {
& && && && && && && && && && &&&log.error(&infoTable flush commits error:\n&+e.getMessage());
& && && && && && && && &}
& && && && && & }
& && && && && & @Override&
& && && && && & public void map(ImmutableBytesWritable row,Result columns,Context cxt){
& && && && && && && && &cxt.getCounter(Counters.ROWS).increment(1);
& && && && && && && && &String value =
& && && && && && && && &try{
& && && && && && && && && && &&&Put put = new Put(row.get());
& && && && && && && && && && &&&for(KeyValue kv : columns.list()){
& && && && && && && && && && && && && & cxt.getCounter(Counters.COLS).increment(1);
& && && && && && && && && && && && && & value= Bytes.toStringBinary(kv.getValue());
& && && && && && && && && && && && && & if(equals(columnQualifier,kv.getQualifier())){&&// 过滤column
& && && && && && && && && && && && && && && && &put.add(columnFamily,columnQualifier1,kv.getValue());
& && && && && && && && && && && && && && && && &infoTable.put(put);
& && && && && && && && && && && && && & }
& && && && && && && && && && &&&}
& && && && && && && && &}catch(Exception e){
& && && && && && && && && && &&&log.info(&Error:&+e.getMessage()+&,Row:&+Bytes.toStringBinary(row.get())+
& && && && && && && && && && && && && && && && &&,Value:&+value);
& && && && && && && && && && &&&cxt.getCounter(Counters.ERROR).increment(1);
& && && && && && && && &}
& && && && && & }
& && && && && & private boolean equals(byte[] a,byte[] b){
& && && && && && && && &String aStr= Bytes.toString(a);
& && && && && && && && &String bStr= Bytes.toString(b);
& && && && && && && && &if(aStr.equals(bStr)){
& && && && && && && && && && &&&
& && && && && && && && &}
& && && && && && && && &
& && && && && & }
& && &&&public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
& && && && && & String input =&testtable&;
& && && && && & byte[] columnFamily = Bytes.toBytes(&fam&);
& && && && && & byte[] columnQualifier = Bytes.toBytes(&data&);
& && && && && & byte[] columnQualifier1 = Bytes.toBytes(&data1&);
& && && && && & Scan scan = new Scan ();
& && && && && & scan.addColumn(columnFamily, columnQualifier);
& && && && && & HadoopUtils.initialConf(&hbase&);
& && && && && & Configuration conf = HadoopUtils.getConf();
& && && && && & conf.set(&conf.columnfamily&, Bytes.toStringBinary(columnFamily));
& && && && && & conf.set(&conf.columnqualifier&, Bytes.toStringBinary(columnQualifier));
& && && && && & conf.set(&conf.columnqualifier1&, Bytes.toStringBinary(columnQualifier1));
& && && && && & conf.set(&conf.infotable&, input);
& && && && && &&
& && && && && & Job job = new Job(conf,&Parse data in &+input+&,into tables&);
& && && && && & job.setJarByClass(ParseSinglePutDriver.class);
& && && && && & TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,&
& && && && && && && && && && &&&ImmutableBytesWritable.class, Put.class,job);& && &&&
& && && && && & job.setOutputFormatClass(NullOutputFormat.class);
& && && && && & job.setNumReduceTasks(0);
& && && && && & System.exit(job.waitForCompletion(true)?0:1);
2.4 上面2.3中的HTable其实也是可以put一个List的,下面的方式就是put一个list的方式,这样效率会高。
package hbase.
import hadoop.util.HadoopU
import hbase.mr.AnalyzeDriver.C
import java.io.IOE
import java.util.ArrayL
import java.util.D
import java.util.L
import org.apache.hadoop.conf.C
import org.apache.hadoop.hbase.KeyV
import org.apache.hadoop.hbase.client.HT
import org.apache.hadoop.hbase.client.P
import org.apache.hadoop.hbase.client.R
import org.apache.hadoop.hbase.client.S
import org.apache.hadoop.hbase.io.ImmutableBytesW
import org.apache.hadoop.hbase.mapreduce.TableMapReduceU
import org.apache.hadoop.hbase.mapreduce.TableM
import org.apache.hadoop.hbase.util.B
import org.apache.hadoop.io.W
import org.apache.hadoop.mapreduce.J
import org.apache.hadoop.mapreduce.lib.output.NullOutputF
import org.slf4j.L
import org.slf4j.LoggerF
public class ParseListPutDriver {
& && &&&/**
& && && &* 使用HTable进行写入
& && && &* List &Put& 进行测试,查看效率
& && && &* 把infoTable 表中的 qualifier字段复制到qualifier1字段
& && && &* @param args
& && && &*/
& && &&&private static Logger log = LoggerFactory.getLogger(ParseMapper.class);
& && &&&static class ParseMapper extends TableMapper&ImmutableBytesWritable,Writable&{
& && && && && & private HTable infoTable =
& && && && && & private byte[] columnFamily =
& && && && && & private byte[] columnQualifier =
& && && && && & private byte[] columnQualifier1 =
& && && && && & private List&Put& list = new ArrayList&Put&();
& && && && && & @Override&
& && && && && & protected void setup(Context cxt){
& && && && && && && && &log.info(&ParseListPutDriver setup,current time: &+new Date());
& && && && && && && && &try {
& && && && && && && && && && &&&infoTable = new HTable(cxt.getConfiguration(),
& && && && && && && && && && && && && && && && &cxt.getConfiguration().get(&conf.infotable&));
& && && && && && && && && && &&&infoTable.setAutoFlush(false);
& && && && && && && && &} catch (IOException e) {
& && && && && && && && && && &&&log.error(&Initial infoTable error:\n&+e.getMessage());
& && && && && && && && &}
& && && && && && && && &columnFamily = Bytes.toBytes(cxt.getConfiguration().get(&conf.columnfamily&));
& && && && && && && && &columnQualifier = Bytes.toBytes(cxt.getConfiguration().get(&conf.columnqualifier&));
& && && && && && && && &columnQualifier1 = Bytes.toBytes(cxt.getConfiguration().get(&conf.columnqualifier1&));
& && && && && & }
& && && && && & @Override&
& && && && && & protected void cleanup(Context cxt){
& && && && && && && && &try {
& && && && && && && && && && &&&infoTable.put(list);
& && && && && && && && && && &&&infoTable.flushCommits();
& && && && && && && && && && &&&log.info(&ParseListPutDriver cleanup ,current time :&+new Date());
& && && && && && && && &} catch (IOException e) {
& && && && && && && && && && &&&log.error(&infoTable flush commits error:\n&+e.getMessage());
& && && && && && && && &}
& && && && && & }
& && && && && & @Override&
& && && && && & public void map(ImmutableBytesWritable row,Result columns,Context cxt){
& && && && && && && && &cxt.getCounter(Counters.ROWS).increment(1);
& && && && && && && && &String value =
& && && && && && && && &try{
& && && && && && && && && && &&&Put put = new Put(row.get());
& && && && && && && && && && &&&for(KeyValue kv : columns.list()){
& && && && && && && && && && && && && & cxt.getCounter(Counters.COLS).increment(1);
& && && && && && && && && && && && && & value= Bytes.toStringBinary(kv.getValue());
& && && && && && && && && && && && && & if(equals(columnQualifier,kv.getQualifier())){&&// 过滤column
& && && && && && && && && && && && && && && && &put.add(columnFamily,columnQualifier1,kv.getValue());
& && && && && && && && && && && && && && && && &list.add(put);
& && && && && && && && && && && && && & }
& && && && && && && && && && &&&}
& && && && && && && && &}catch(Exception e){
& && && && && && && && && && &&&log.info(&Error:&+e.getMessage()+&,Row:&+Bytes.toStringBinary(row.get())+
& && && && && && && && && && && && && && && && &&,Value:&+value);
& && && && && && && && && && &&&cxt.getCounter(Counters.ERROR).increment(1);
& && && && && && && && &}
& && && && && & }
& && && && && & private boolean equals(byte[] a,byte[] b){
& && && && && && && && &String aStr= Bytes.toString(a);
& && && && && && && && &String bStr= Bytes.toString(b);
& && && && && && && && &if(aStr.equals(bStr)){
& && && && && && && && && && &&&
& && && && && && && && &}
& && && && && && && && &
& && && && && & }
& && &&&public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
& && && && && & String input =&testtable&;
& && && && && & byte[] columnFamily = Bytes.toBytes(&fam&);
& && && && && & byte[] columnQualifier = Bytes.toBytes(&data&);
& && && && && & byte[] columnQualifier1 = Bytes.toBytes(&data2&);
& && && && && & Scan scan = new Scan ();
& && && && && & scan.addColumn(columnFamily, columnQualifier);
& && && && && & HadoopUtils.initialConf(&hbase&);
& && && && && & Configuration conf = HadoopUtils.getConf();
& && && && && & conf.set(&conf.columnfamily&, Bytes.toStringBinary(columnFamily));
& && && && && & conf.set(&conf.columnqualifier&, Bytes.toStringBinary(columnQualifier));
& && && && && & conf.set(&conf.columnqualifier1&, Bytes.toStringBinary(columnQualifier1));
& && && && && & conf.set(&conf.infotable&, input);
& && && && && &&
& && && && && & Job job = new Job(conf,&Parse data in &+input+&,into tables&);
& && && && && & job.setJarByClass(ParseListPutDriver.class);
& && && && && & TableMapReduceUtil.initTableMapperJob(input, scan, ParseMapper.class,&
& && && && && && && && && && &&&ImmutableBytesWritable.class, Put.class,job);& && &&&
& && && && && & job.setOutputFormatClass(NullOutputFormat.class);
& && && && && & job.setNumReduceTasks(0);
& && && && && & System.exit(job.waitForCompletion(true)?0:1);
数据记录条数为:26632,可以看到下面图片中的时间记录对比:
由于结合了hbase,所以需要在hadoop_home/lib目录下面加些额外的包,其整个包如下(hbase1.0.jar为编译打包的MR程序):
优质网站模板}

我要回帖

更多关于 数据库文件导出 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信