Flink 1.9.0 with Hive

Apache Flink 从 1.9.0 版本开始增加了与Hive集成的功能,用户可以通过Flink来访问Hive的元数据,以及读写Hive中的表。

新增功能

元数据

原先Flink提供的 ExternalCatalog 定义非常不完整,基本不可用。提出了一套全新的 Catalog 接口来取代 ExternalCatalog 。新的 Catalog能够支持数据库、表、分区等多种元数据对象。
允许一个用户session中维护多个 Catalog 实例,从而访问多个外部系统。并且 Catalog 以可插拔的方式接入Flink,允许用户提供自定义实现。

表数据

新增了 flink-connector-hive 模块

  • 安装flink-1.9.0

  • 安装hive-2.3.4

  • /usr/local/flink-1.9.0/lib下添加依赖包:

1
2
3
4
5
6
7
8
9
10
antlr-runtime-3.5.2.jar
antlr4-runtime-4.5.jar
datanucleus-api-jdo-4.2.4.jar
datanucleus-core-4.1.17.jar
datanucleus-rdbms-4.1.19.jar
flink-connector-hive_2.11-1.9.0.jar
flink-hadoop-compatibility_2.11-1.9.0.jar
flink-shaded-hadoop-2-uber-2.7.5-7.0.jar
hive-exec-2.3.4.jar
javax.jdo-3.2.0-m3.jar
  • 修改sql-client的配置文件/usr/local/flink-1.9.0/conf/sql-client-defaults.yaml:
1
2
3
4
5
6
7
8
9
# catalogs: [] # empty list
catalogs:
# A typical catalog definition looks like:
- name: myhive_catalog
type: hive
#property-version: 2
hive-conf-dir: /usr/local/hive/conf/
#default-database: default
hive-version: 2.3.4

avatar

  • 启动Flink集群:
1
./start-cluster.sh
  • 启动SQL Client:
1
./sql-client.sh embedded

avatar

  • 列举所有的catalog:
1
2
3
4
Flink SQL> show catalogs;
......
default_catalog
myhive_catalog
  • 使用catalog、database等:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Flink SQL> use catalog myhive_catalog;
......

Flink SQL> show databases;
......
default
test

Flink SQL> use test;
......

Flink SQL> show tables;
......
employee
emplyee
student

查看表结构:

1
2
3
4
5
6
Flink SQL> describe employee;
......
2019-09-06 22:04:15,935 INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=miaowenting ip=unknown-ip-addr cmd=get_table : db=test tbl=employee
root
|-- id: INT
|-- name: STRING

查询表数据:

1
Flink SQL> select * from employee;

avatar

插入表数据有问题,待解决:

1
2
3
4
5
6
7
8
Flink SQL> insert into employee(id,name) values (4,'test');
[INFO] Submitting SQL update statement to the cluster...
2019-09-06 21:47:29,944 INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 0: get_database: default
2019-09-06 21:47:29,945 INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=miaowenting ip=unknown-ip-addr cmd=get_database: default
2019-09-06 21:47:29,948 INFO org.apache.hadoop.hive.metastore.HiveMetaStore - 0: get_database: test
2019-09-06 21:47:29,948 INFO org.apache.hadoop.hive.metastore.HiveMetaStore.audit - ugi=miaowenting ip=unknown-ip-addr cmd=get_database: test
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Partial inserts are not supported

Table API中使用Hive

Table API创建HiveCatalog

1
2
3
4
5
6
7
8
9
10
String name = "myhive_catalog";
String defaultDatabase = "default";
String hiveConfDir = "/usr/local/hive/conf/";
String version = "2.3.4";

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().
TableEnvironment tableEnv = ...;
HiveCatalog hiveCatalog = new HiveCatalog(name,defaultDatabase,hiveConfDir,version);
tableEnv.registerCatalog(name,hiveCatalog);
tableEnv.useCatalog(name);

Table API读写Hive表

1
2
3
4
5
6
7
TableEnvironment tableEnv = ...;
tableEnv.registerCatalog(name,hiveCatalog);
tableEnv.useCatalog(name);

Table source = tableEnv.sqlQuery("select * from source");
tableEnv.sqlUpdate("insert into source values('newKey','newValue')");
tableEnv.execute("insert into source");