Spark提供了thrift server,可以提供HIVE2的JDBC连接。

启动方式:

./sbin/start-thriftserver.sh --master spark://spark1:7077

启动后,可以使用JDBC连接。

insert

我想提供一个传统用户比较友好的SQL操作,使用insert命令插入数据。

beenline方式

下面用beenline简单测试下:

./bin/beenline
beeline> !connect jdbc:hive2://spark1:10000
0: jdbc:hive2://spark1:10000> create database xxx;
0: jdbc:hive2://spark1:10000> use xxx;
0: jdbc:hive2://spark1:10000> create table t1(c1 int , c2 int);
0: jdbc:hive2://spark1:10000> select * from t1;
+-----+-----+--+
| c1  | c2  |
+-----+-----+--+
+-----+-----+--+
No rows selected (0.147 seconds)
0: jdbc:hive2://spark1:10000> insert into t1 select t.* from (select 1,1) t;
0: jdbc:hive2://spark1:10000> select * from t1;
+-----+-----+--+
| c1  | c2  |
+-----+-----+--+
| 1   | 1   |
+-----+-----+--+
1 row selected (1.287 seconds)

很神奇的insert语句。

JDBC

使用HIVE2的JDBC Driver,可以在任意机器上通过thrift server写到SPARK上。具体可以参考hive官方示例。这个例子使用的是load data的方式,需要改成insert方式,下面只贴出来这部分代码:

        // insert
        sql = "insert into " + tableName + " select t.* from (select 1,'aaa') t";
        System.out.println("Running: " + sql);
        res = stmt.executeQuery(sql);
        while (res.next()) {
            System.out.println(String.valueOf(res.getInt(1)) + "\t" + res.getString(2));
        }

但是这种方式,非常不适合有大量数据写入的场景。我在测试的时候,发现基本上每个写入都需要耗时250-300ms,速度完全无法接受。这是可以预期的,对于spark来说,每个insert都是一个job,时间必然拉长。

load data

比较好的办法是使用load data,即将数据先写入到文件,再将文件上传到HDFS(或者thrift server的机器)上,然后发起load data,一个job即可将数据全部导入。我试过导入一个2000万的窄表(总共500MB),剔除上传到HDFS的时间(跟网络相关),只需要2.5s。

1、创建表。 我的数据是以逗号分隔多个field,注意创建表时要指定使用逗号区分不同列。

String createsql = "create table " + tableName + " (id int, c2 int, rating float, c4 int) " +
                " ROW FORMAT DELIMITED FIELDS TERMINATED BY ','";
stmt.execute(createsql);

2、数据上传到HDFS

我在写入hdfs的权限上遇到了些麻烦。HDFS要求使用集群上的用户来登录,但代码中如果不指定用户,默认会使用执行hiveclinet的用户名来登录HDFS,但由于这是两套系统,账户不同,所以写入HDFS会失败,提示“Permition denied”。一般有3种解决办法:

  • 切换系统当前登录的用户,改为跟hiveserver的用户一样
  • 设置系统变量或者java变量(我不会)
  • 设置HDFS对应目录权限为777。

其实hdfs还提供了另外一种方式,可以在代码里打开FS时设置用户名,代码如下。我这里已经有现成的表了,所以直接copy即可。

其他HDFS的API可以参考这里.

        // load data into table
        Configuration conf = new Configuration();
        conf.set("fs.hdfs.impl",
                org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
        );
        String filepath = "/upload/my.csv";
        try {
            FileSystem fs = FileSystem.get(new URI("hdfs://spark1:8020"), conf, "ieevee");
            if (fs == null) {
                System.out.println("get failed");
            }
            Path src = new Path("/Users/hubaotao/ratings.csv");
            Path dst = new Path(filepath);
            fs.copyFromLocalFile(src, dst);
            fs.close();
        }
        catch (Exception e) {
            e.printStackTrace();
        }

3、开始导入

由于我是从HDFS上导入数据,所以命令是“load data inpath”。如果你在hiveserver的机器上执行(比如在beeline上),命令相应的是“load data local inpath”。导入非常快,很快就可以查询了,而且这种方式比SQL INSERT进去的数据,查询要快很多。

        sql = "load data inpath '" + filepath + "' into table " + tableName;
        stmt.execute(sql);