spark2怎么添加hivehive删除分区表字段到mysql中

[Spark]使用Spark2.30读写MySQL
本博文是的姊妹篇,环境及Java项目也是使用上一博文中的。
vagrant@node1:~/HelloSparkHive$ ls
build.gradle
vagrant@node1:~/HelloSparkHive$ rm -rf build
vagrant@node1:~/HelloSparkHive$ tree
├── build.gradle
└── src
└── main
└── java
└── com
└── yqu
└── sparkhive
├── HelloSparkHiveDriver.java
└── HelloSparkMysqlDriver.java
6 directories, 3 files
src/main/java/com/yqu/sparkhive/HelloSparkMysqlDriver.java
该范例加载Hive中的emp表,存储到MySQL的test数据库中,然后读取MySQL数据库加载emp表,由此完成MySQL读写示例。
package com.yqu.
import org.apache.spark.sql.D
import org.apache.spark.sql.R
import org.apache.spark.sql.SparkS
import java.io.F
import java.sql.*;
public class HelloSparkMysqlDriver {
private static boolean setup() {
Connection conn =
Statement stmt =
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection(
"jdbc:mysql://10.211.55.101:3306",
"root","root");
stmt = conn.createStatement();
stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS test");
stmt.executeUpdate("DROP TABLE IF EXISTS test.emp");
ResultSet rs = stmt.executeQuery("SHOW DATABASES");
while(rs.next()){
System.out.println(rs.getString("Database"));
rs.close();
stmt.close();
conn.close();
} catch (ClassNotFoundException e) {
System.out.println("Can not find com.mysql.jdbc.Driver!");
} catch(SQLException se){
//Handle errors for JDBC
se.printStackTrace();
} finally {
if(stmt!=null)
stmt.close();
} catch(SQLException se){
se.printStackTrace();
if(conn!=null)
conn.close();
} catch(SQLException se){
se.printStackTrace();
public static void main(String args[]) {
if(setup()) {
// warehouseLocation points to the default location
// for managed databases and tables
String warehouseLocation = new File("spark-warehouse").
getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
Dataset《Row》 hiveDF = spark.sql("SELECT * FROM emp");
// Saving data to a JDBC source
hiveDF.write()
.format("jdbc")
.option("url", "jdbc:mysql://10.211.55.101:3306/test")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "emp")
.option("user", "root")
.option("password", "root")
Dataset《Row》 jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:mysql://10.211.55.101:3306/test")
.option("driver", "com.mysql.jdbc.Driver")
.option("dbtable", "emp")
.option("user", "root")
.option("password", "root")
jdbcDF.show();
spark.close();
System.out.println("MySQL database is not ready!");
构建及测试
vagrant@node1:~/HelloSparkHive$ gradle build jar
BUILD SUCCESSFUL in 0s
2 actionable tasks: 2 executed
vagrant@node1:~/HelloSparkHive$ spark-submit --class com.yqu.sparkhive.HelloSparkMysqlDriver --deploy-mode client --master local[2] --jars /usr/local/java/lib/ext/mysql-connector-java-5.1.40.jar /home/vagrant/HelloSparkHive/build/libs/hello-spark-hive-0.1.0.jar
Databases:
information_schema
hive_metastore
performance_schema
10:27:49 INFO
DAGScheduler:54 - Job 1 finished: show at HelloSparkMysqlDriver.java:87, took 0.078809 s
+-----+------+---------+----+----------+------+------+------+
|empno| ename|
hiredate|salary|
comm|deptno|
+-----+------+---------+----+----------+------+------+------+
| 7369| SMITH|
CLERK|-17|
null| 800.0|
| 7499| ALLEN| SALESMAN|-20|
null|1600.0|
WARD| SALESMAN|-22|
null|1250.0|
| 7566| JONES|
MANAGER|-02|
null|2975.0|
| 7654|MARTIN| SALESMAN|-28|
null|1250.0|
| 7698| BLAKE|
MANAGER|-01|
null|2850.0|
| 7782| CLARK|
MANAGER|-09|
null|2450.0|
| 7788| SCOTT|
ANALYST|-09|
null|3000.0|
KING|PRESIDENT|null||
null|5000.0|
| 7844|TURNER| SALESMAN|-08|
null|1500.0|
| 7876| ADAMS|
CLERK|-12|
null|1100.0|
| 7900| JAMES|
CLERK|-03|
null| 950.0|
ANALYST|-03|
null|3000.0|
| 7934|MILLER|
CLERK|-23|
null|1300.0|
ANALYST|7566|
null|1500.0|
| 7987| JULIA|
ANALYST|7566|
null|1500.0|
+-----+------+---------+----+----------+------+------+------+
vagrant@node1:~/HelloSparkHive$ spark-shell --jars /usr/local/java/lib/ext/mysql-connector-java-5.1.40.jar
scala& spark.
| format("jdbc").
| option("url", "jdbc:mysql://10.211.55.101:3306/test").
| option("driver", "com.mysql.jdbc.Driver").
| option("dbtable", "emp").
| option("user", "root").
| option("password", "root").
| load().show()
+-----+------+---------+----+----------+------+------+------+
|empno| ename|
hiredate|salary|
comm|deptno|
+-----+------+---------+----+----------+------+------+------+
| 7369| SMITH|
CLERK|-17|
null| 800.0|
| 7499| ALLEN| SALESMAN|-20|
null|1600.0|
WARD| SALESMAN|-22|
null|1250.0|
| 7566| JONES|
MANAGER|-02|
null|2975.0|
| 7654|MARTIN| SALESMAN|-28|
null|1250.0|
| 7698| BLAKE|
MANAGER|-01|
null|2850.0|
| 7782| CLARK|
MANAGER|-09|
null|2450.0|
| 7788| SCOTT|
ANALYST|-09|
null|3000.0|
KING|PRESIDENT|null||
null|5000.0|
| 7844|TURNER| SALESMAN|-08|
null|1500.0|
| 7876| ADAMS|
CLERK|-12|
null|1100.0|
| 7900| JAMES|
CLERK|-03|
null| 950.0|
ANALYST|-03|
null|3000.0|
| 7934|MILLER|
CLERK|-23|
null|1300.0|
ANALYST|7566|
null|1500.0|
| 7987| JULIA|
ANALYST|7566|
null|1500.0|
+-----+------+---------+----+----------+------+------+------+
scala& :quit
vagrant@node1:~/HelloSparkHive$
已投稿到:
以上网友发言只代表其个人观点,不代表新浪网的观点或立场。spark sql 查询hive表并写入到PG中
时间: 18:09:00
&&&& 阅读:97
&&&& 评论:
&&&& 收藏:0
标签:&&&&&&&&&&&&&&&&&&&&&&&&&&&
import java.sql.DriverManager
import java.util.Properties
import com.zhaopin.tools.{DateUtils, TextUtils}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
* Created by xiaoyan on .
object IhrDownloadPg {
def main(args: Array[String]){
//设置spark日志级别
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
System.setProperty("HADOOP_USER_NAME","hive")
val spark = SparkSession
.builder()
.master("local[*]")
.appName("hive -&& ihr_oper_download")
.config("spark.sql.warehouse.dir", "spark-warehouse")
.config("hive.metastore.uris", "thrift://master:9083")
.enableHiveSupport()
.getOrCreate()
import spark.sql
val dt = if(!args.isEmpty) args(0) else ""
val yesterday = DateUtils.dateAdd(dt, -1)
val url = "jdbc:postgresql://192.168.9.222:5432/safe_base"
Class.forName("org.postgresql.Driver")
val conn = DriverManager.getConnection(url,"secu_man","secu_man")
val stmt = conn.createStatement()
stmt.execute("delete from ihr_oper_download where dt = ‘" + yesterday+"‘")
val re1 = sql("select oper_date, " +
acct_id, " +
acct_name, " +
module_name, " +
oper_desc, " +
" from safe.fact_ihr_oper_download t " +
" where t.dt & ‘‘ and t.dt &"+yesterday+"");
val connectionProperties = new Properties()
//增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)
connectionProperties.put("user", "secu_man");
connectionProperties.put("password", "secu_man");
connectionProperties.put("driver", "org.postgresql.Driver");
re1.toDF().write.mode("append").jdbc(url, "ihr_oper_download", connectionProperties);
System.err.print("ihr_oper_download insert complete!! ");
  注意:如果PG表不存在,默认会自动创建一张表,且字段类型为text标签:&&&&&&&&&&&&&&&&&&&&&&&&&&&原文:https://www.cnblogs.com/qxyy/p/9073148.html
教程昨日排行
&&国之画&&&& &&&&&&
&& &&&&&&&&&&&&&&
鲁ICP备号-4
打开技术之扣,分享程序人生![Spark]使用Spark2.30读写Hive2.3.3
试验环境搭建
安装Spark环境
犯懒,直接使用通过Vagrant搭建了一个Hadoop
2.7.6 + Hive 2.3.3 + Spark 2.3.0的虚拟机环境。
在Hive上加载emp表
hive& create table emp (empno int, ename string, job string, mgr int, hiredate string, salary double, comm double, deptno int) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' ;
hive& LOAD DATA LOCAL INPATH '/usr/local/hive/examples/files/emp2.txt' OVERWRITE INTO TABLE
安装Gradle
按照中的方式手工安装Gradle:
vagrant@node1:~$ export GRADLE_HOME=/opt/gradle/gradle-4.8.1
vagrant@node1:~$ export PATH=$PATH:$GRADLE_HOME/bin
vagrant@node1:~$ gradle -v
Welcome to Gradle 4.8.1!
Here are the highlights of this release:
- Dependency locking
- Maven Publish and Ivy Publish plugins improved and marked stable
- Incremental annotation processing enhancements
- APIs to configure tasks at creation time
For more details see https://docs.gradle.org/4.8.1/release-notes.html
------------------------------------------------------------
Gradle 4.8.1
------------------------------------------------------------
Build time:
07:53:06 UTC
0abdeadf42e7750ccba34d69b516a22
Apache Ant(TM) version 1.9.11 compiled on March 23 2018
1.8.0_171 (Oracle Corporation 25.171-b11)
Linux 4.4.0-128-generic amd64
vagrant@node1:~/HelloSparkHive$ tree
├── build.gradle
└── src
└── main
└── java
└── com
└── yqu
└── sparkhive
└── HelloSparkHiveDriver.java
6 directories, 2 files
build.gradle
apply plugin: 'java-library'
apply plugin: 'eclipse'
apply plugin: 'idea'
repositories {
sourceCompatibility = 1.8
targetCompatibility = 1.8
dependencies {
compileOnly 'org.apache.spark:spark-core_2.11:2.3.0'
compileOnly 'org.apache.spark:spark-sql_2.11:2.3.0'
testImplementation 'org.apache.spark:spark-core_2.11:2.3.0','junit:junit:4.12'
baseName = 'hello-spark-hive'
src/main/java/com/yqu/sparkhive/HelloSparkHiveDriver.java
该范例仅将emp表加载成DataSet,然后由DataSet创建临时视图,由临时视图创建新表,由此完成Hive读写示例。
package com.yqu.
import java.io.F
import org.apache.spark.sql.D
import org.apache.spark.sql.R
import org.apache.spark.sql.SparkS
public class HelloSparkHiveDriver {
public static void main(String args[]) {
if(args.length==0) {
System.out.println("Please provide target hive table name!");
// warehouseLocation points to the default location
// for managed databases and tables
String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
SparkSession spark = SparkSession
.builder()
.appName("
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate();
// Queries are expressed in HiveQL
Dataset sqlDF = spark.sql("SELECT * FROM emp");
System.out.println("emp content:");
sqlDF.show();
sqlDF.createOrReplaceTempView("yquTempTable");
spark.sql("create table "+args[0]+" as select * from yquTempTable");
System.out.println(args[0]+" content:");
spark.sql("SELECT * FROM "+args[0]).show();
spark.close();
构建及测试
vagrant@node1:~/HelloSparkHive$ gradle build jar
BUILD SUCCESSFUL in 0s
2 actionable tasks: 2 executed
vagrant@node1:~/HelloSparkHive$ spark-submit --class com.yqu.sparkhive.HelloSparkHiveDriver --deploy-mode client --master local[2] /home/vagrant/HelloSparkHive/build/libs/hello-spark-hive-0.1.0.jar yqu1
vagrant@node1:~/HelloSparkHive$ hive
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/apache-hive-2.3.3-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/apache-tez-0.9.1-bin/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop-2.7.6/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Connecting to jdbc:hive2://
18/07/10 07:55:18 [main]: WARN session.SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
Connected to: Apache Hive (version 2.3.3)
Driver: Hive JDBC (version 2.3.3)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 2.3.3 by Apache Hive
No rows affected (0.931 seconds)
2 rows selected (0.229 seconds)
hive& select * from yqu1;
18/07/02 07:55:37 [XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX main]: ERROR hdfs.KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !!
7369 SMITH CLERK -17
7499 ALLEN SALESMAN -20
7521 WARD SALESMAN -22
7566 JONES MANAGER -02
7654 MARTIN SALESMAN -28
7698 BLAKE MANAGER -01
7782 CLARK MANAGER -09
7788 SCOTT ANALYST -09
7839 KING PRESIDENT
7844 TURNER SALESMAN -08
7876 ADAMS CLERK -12
7900 JAMES CLERK -03
7902 FORD ANALYST -03
7934 MILLER CLERK -23
7988 KATY ANALYST 7566 NULL
7987 JULIA ANALYST 7566 NULL
16 rows selected (1.546 seconds)
已投稿到:
以上网友发言只代表其个人观点,不代表新浪网的观点或立场。博客分类:
spark sql 能够通过thriftserver 访问hive数据,默认spark编译的版本是不支持访问hive,因为hive依赖比较多,因此打的包中不包含hive和thriftserver,因此需要自己下载源码进行编译,将hive,thriftserver打包进去才能够访问,详细配置步骤如下:
1、下载源码
2、下载Maven,并配置
此配置简单,就略过
3、使用maven进行打包:
打包命令:
mvn -Pyarn -Dhadoop.version=2.3.0-cdh5.0.0 -Phive -Phive-thriftserver -DskipTests clean package
上面的hadoop.version可以根据自己的需要设置相应的版本
注:在windows环境下编译时,由于编译需要的内存很大,因此要设置一下maven使用的内存数据,
进入MAVEN_HOME/bin/ 目录,修改mvn.bat文件,定位到下面一行位置,此行已被注释
@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000
然后在下一行增加下面内容,并保存
set MAVEN_OPTS= -Xms1024m -Xmx1024m -XX:MaxPermSize=512m
编译时可能会失败,这是由于网络中断的原因,多试几次就能成功
4、将SPARK_SRC_HOME\assembly\target\scala-2.10\spark-assembly-1.4.0-hadoop2.3.0-cdh5.0.0.jar 文件替换掉SPARK_HOME/lib 目录下的
spark-assembly*.jar文件
SPARK_SRC_HOME:为spark源码路径
SPARK_HOME:spark安装路径
5、cp HIVE_HOME/conf/hive-site.xml SPARK_HOME/conf/
将hive安装目录下的hive-site.xml文件复制到spark的conf目录下
6、进入SPARK_HOME/bin/ 运行 ./spark-sql --master spark://masterIp:7077
7、查询hive表数据
& select * from test2;
15/07/16 14:07:13 INFO ParseDriver: Parsing command: select * from test2
15/07/16 14:07:13 INFO ParseDriver: Parse Completed
15/07/16 14:07:20 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.209.140:51413 in memory (size: 1671.0 B, free: 267.3 MB)
15/07/16 14:07:20 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 192.168.209.141:35827 in memory (size: 1671.0 B, free: 267.3 MB)
15/07/16 14:07:21 WARN HiveConf: DEPRECATED: Configuration property hive.metastore.local no longer has any effect. Make sure to provide a valid value for hive.metastore.uris if you are connecting to a remote metastore.
15/07/16 14:07:21 INFO deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
15/07/16 14:07:22 INFO MemoryStore: ensureFreeSpace(377216) called with curMem=0, maxMem=
15/07/16 14:07:22 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 368.4 KB, free 266.9 MB)
15/07/16 14:07:22 INFO MemoryStore: ensureFreeSpace(32203) called with curMem=377216, maxMem=
15/07/16 14:07:22 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 31.4 KB, free 266.9 MB)
15/07/16 14:07:22 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.209.141:35827 (size: 31.4 KB, free: 267.2 MB)
15/07/16 14:07:22 INFO SparkContext: Created broadcast 1 from processCmd at CliDriver.java:423
15/07/16 14:07:23 INFO FileInputFormat: Total input paths to process : 1
15/07/16 14:07:23 INFO SparkContext: Starting job: processCmd at CliDriver.java:423
15/07/16 14:07:23 INFO DAGScheduler: Got job 1 (processCmd at CliDriver.java:423) with 2 output partitions (allowLocal=false)
15/07/16 14:07:23 INFO DAGScheduler: Final stage: ResultStage 1(processCmd at CliDriver.java:423)
15/07/16 14:07:23 INFO DAGScheduler: Parents of final stage: List()
15/07/16 14:07:23 INFO DAGScheduler: Missing parents: List()
15/07/16 14:07:23 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[5] at processCmd at CliDriver.java:423), which has no missing parents
15/07/16 14:07:23 INFO MemoryStore: ensureFreeSpace(7752) called with curMem=409419, maxMem=
15/07/16 14:07:23 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 7.6 KB, free 266.9 MB)
15/07/16 14:07:23 INFO MemoryStore: ensureFreeSpace(4197) called with curMem=417171, maxMem=
15/07/16 14:07:23 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.1 KB, free 266.9 MB)
15/07/16 14:07:23 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.209.141:35827 (size: 4.1 KB, free: 267.2 MB)
15/07/16 14:07:23 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:874
15/07/16 14:07:23 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at processCmd at CliDriver.java:423)
15/07/16 14:07:23 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
15/07/16 14:07:24 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, 192.168.209.139, ANY, 1435 bytes)
15/07/16 14:07:24 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2, 192.168.209.140, ANY, 1435 bytes)
15/07/16 14:07:24 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.209.140:51413 (size: 4.1 KB, free: 267.3 MB)
15/07/16 14:07:24 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.209.140:51413 (size: 31.4 KB, free: 267.2 MB)
15/07/16 14:07:25 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.209.139:44770 (size: 4.1 KB, free: 267.3 MB)
15/07/16 14:07:30 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.209.139:44770 (size: 31.4 KB, free: 267.2 MB)
15/07/16 14:07:34 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) in 10767 ms on 192.168.209.140 (1/2)
15/07/16 14:07:39 INFO DAGScheduler: ResultStage 1 (processCmd at CliDriver.java:423) finished in 15.284 s
15/07/16 14:07:39 INFO StatsReportListener: Finished stage: org.apache.spark.scheduler.StageInfo@9aa2a1b
15/07/16 14:07:39 INFO DAGScheduler: Job 1 finished: processCmd at CliDriver.java:423, took 15.414215 s
15/07/16 14:07:39 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 15287 ms on 192.168.209.139 (2/2)
15/07/16 14:07:39 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/07/16 14:07:39 INFO StatsReportListener: task runtime:(count: 2, mean: , stdev: , max: , min: )
15/07/16 14:07:39 INFO StatsReportListener:
15/07/16 14:07:39 INFO StatsReportListener:
15/07/16 14:07:39 INFO StatsReportListener: task result size:(count: 2, mean: , stdev: 15.000000, max: , min: )
15/07/16 14:07:39 INFO StatsReportListener:
15/07/16 14:07:39 INFO StatsReportListener:
15.3 KB 15.3 KB 15.3 KB 15.3 KB 15.3 KB 15.3 KB 15.3 KB
15.3 KB 15.3 KB
15/07/16 14:07:39 INFO StatsReportListener: executor (non-fetch) time pct: (count: 2, mean: 72.195332, stdev: 18.424154, max: 90.619485, min: 53.771178)
15/07/16 14:07:39 INFO StatsReportListener:
15/07/16 14:07:39 INFO StatsReportListener:
15/07/16 14:07:39 INFO StatsReportListener: other time pct: (count: 2, mean: 27.804668, stdev: 18.424154, max: 46.228822, min: 9.380515)
15/07/16 14:07:39 INFO StatsReportListener:
15/07/16 14:07:39 INFO StatsReportListener:
............
Time taken: 26.523 seconds, Fetched 889 row(s)
15/07/16 14:07:39 INFO CliDriver: Time taken: 26.523 seconds, Fetched 889 row(s)
spark-sql&
由于我是自己笔记本上的虚拟机上运行,同时只有两个work节点,所以运行会慢一点,但不影响功能
看到有的同学会要求先启动thriftserver,经测试可以不需要手动启动,在运行./spark-sql时会自动启动
同时hive-site.xml也不需要修改任何配置
浏览: 314391 次
来自: 广州
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'博客分类:
本次例子通过scala编程实现Spark SQL操作Hive数据库!
Hadoop集群搭建:
Spark集群搭建:
数据准备
在/usr/local/sparkApps/SparkSQL2Hive/resources/目录下创建people.txt内容如下,name和age之间是"\t"分割
Michael&&& 20
Andy&&& 17
Justin&&& 19
创建份数peopleScores.txt,内容如下,name和score之间用“\t”分割
Michael&&& 98
Andy&&& 95
Justin&&& 68
代码实现
package com.imf.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
* 通过spark sql操作hive数据源
object SparkSQL2Hive {
def main(args: Array[String]): Unit = {
val conf = new SparkConf();
conf.setAppName("SparkSQL2Hive for scala")
conf.setMaster("spark://master1:7077")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
//用户年龄
hiveContext.sql("use testdb")
hiveContext.sql("DROP TABLE IF EXISTS people")
hiveContext.sql("CREATE TABLE IF NOT EXISTS people(name STRING, age INT)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'")
//把本地数据加载到hive中(实际上发生了数据拷贝),也可以直接使用HDFS中的数据
hiveContext.sql("LOAD DATA LOCAL INPATH '/usr/local/sparkApps/SparkSQL2Hive/resources/people.txt' INTO TABLE people")
//用户份数
hiveContext.sql("use testdb")
hiveContext.sql("DROP TABLE IF EXISTS peopleScores")
hiveContext.sql("CREATE TABLE IF NOT EXISTS peopleScores(name STRING, score INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' LINES TERMINATED BY '\\n'")
hiveContext.sql("LOAD DATA LOCAL INPATH '/usr/local/sparkApps/SparkSQL2Hive/resources/peopleScore.txt' INTO TABLE peopleScores")
* 通过HiveContext使用join直接基于hive中的两种表进行操作
val resultDF = hiveContext.sql("select pi.name,pi.age,ps.score "
+" from people pi join peopleScores ps on pi.name=ps.name"
+" where ps.score&90");
* 通过saveAsTable创建一张hive managed table,数据的元数据和数据即将放的具体位置都是由
* hive数据仓库进行管理的,当删除该表的时候,数据也会一起被删除(磁盘的数据不再存在)
hiveContext.sql("drop table if exists peopleResult")
resultDF.saveAsTable("peopleResult")
* 使用HiveContext的table方法可以直接读取hive数据仓库的Table并生成DataFrame,
* 接下来机器学习、图计算、各种复杂的ETL等操作
val dataframeHive = hiveContext.table("peopleResult")
dataframeHive.show()
调度脚本
并将上面的程序打包成SparkSQL2Hive.jar,将SparkSQL2Hive.jar拷贝到/usr/local/sparkApps/SparkSQL2Hive/目录下面,并创建调度脚本run.sh,内容如下:
/usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit \
--class com.imf.spark.sql.SparkSQL2Hive \
--files /usr/local/hive/apache-hive-1.2.1-bin/conf/hive-site.xml \
--master spark://master1:7077 \
/usr/local/sparkApps/SparkSQL2Hive/SparkSQL2Hive.jar
#如果已经将msyql的驱动放到了spark的lib目录下面,则不用在添加下面的mysql的驱动了
#--driver-class-path /usr/local/hive/apache-hive-1.2.1-bin/lib/mysql-connector-java-5.1.35-bin.jar \
详细执行的日志见附件 run.log
用hive来查看表内容和执行结果
root@master1:/usr/local/tools# hive
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hadoop/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/spark/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/hadoop/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/spark/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Logging initialized using configuration in jar:file:/usr/local/hive/apache-hive-1.2.1-bin/lib/hive-common-1.2.1.jar!/hive-log4j.properties
Time taken: 1.013 seconds, Fetched: 2 row(s)
Time taken: 0.103 seconds
peopleresult
peoplescores
tmp_pre_hour_seach_info
Time taken: 0.082 seconds, Fetched: 9 row(s)
hive& select *
Time taken: 1.252 seconds, Fetched: 3 row(s)
hive& select *
Time taken: 0.142 seconds, Fetched: 3 row(s)
hive& select *
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Time taken: 0.298 seconds, Fetched: 2 row(s)
至此,通过SparkSQL操作hive数据库成功!
下载次数: 20
浏览 12018
浏览: 133673 次
来自: 上海
你的这个是创建的临时的hive表,数据也是通过文件录入进去的, ...
我的提交上去
总是报错,找不到hive表,可能是哪里 ...
target jvm版本也要选择正确。不能选择太高。2.10对 ...
(window.slotbydup=window.slotbydup || []).push({
id: '4773203',
container: s,
size: '200,200',
display: 'inlay-fix'}

我要回帖

更多关于 hive创建分区表 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信