Catalog
Catalog提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从Table API和SQL查询语句中来访问。
Catalog允许用户引用其数据存储系统中现有的元数据,并自动将其映射到Flink的相应元数据。例如,Flink可以直接使用Hive MetaStore中的表的元数据,不必在Flink中手动重写ddl,也可以将Flink SQL中的元数据存储到 Hive MetaStore中。Catalog极大地简化了用户开始使用Flink的步骤,并极大地提升了用户体验。
1. Catalog类型
目前Flink包含了以下四种Catalog:
- GenericInMemoryCatalog:基于内存实现的Catalog,所有元数据只在session的生命周期(即一个Flink任务一次运行生命周期内)内可用。默认自动创建,会有名为"default_catalog"的内存Catalog,这个Catalog默认只有一个名为"default_database"的数据库。
- JdbcCatalog:JdbcCatalog使得用户可以将Flink通过JDBC协议连接到关系数据库。Postgres Catalog和MySQL Catalog是目前仅有的两种JDBC Catalog实现,将元数据存储在数据库中。
- HiveCatalog:有两个用途,一是单纯作为Flink元数据的持久化存储,二是作为读写现有Hive元数据的接口。注意:Hive MetaStore以小写形式存储所有元数据对象名称。Hive Metastore以小写形式存储所有元对象名称,而 GenericInMemoryCatalog会区分大小写。
- 用户自定义Catalog:用户可以实现Catalog接口实现自定义Catalog。从Flink1.16开始引入了用户类加载器,通过CatalogFactory.Context#getClassLoader访问,否则会报错ClassNotFoundException。
2. JdbcCatalog(MySQL)
JdbcCatalog不支持建表,只是打通flink与mysql的连接,可以去读写mysql现有的库表。
2.1 上传所需jar包到lib下
sh
[jack@hadoop102 software]$ ll
总用量 1563556
-rw-rw-r--. 1 jack jack 482499797 2月 9 21:05 flink-1.17.2-bin-scala_2.12.tgz
-rw-rw-r--. 1 jack jack 387522 2月 24 20:50 flink-connector-jdbc-3.2.0-1.19.jar
-rw-rw-r--. 1 jack jack 51457817 2月 20 15:26 flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar
cp flink-connector-jdbc-3.2.0-1.19.jar ../module/flink-1.17.2/lib/
2.2 重启flink集群和sql-client
2.3 创建Catalog
sql
CREATE CATALOG my_jdbc_catalog WITH(
'type' = 'jdbc',
-- 连接到的默认数据库
'default-database' = 'test',
'username' = 'root',
'password' = '000000',
'base-url' = 'jdbc:mysql://hadoop104:3306'
);
2.4 查看Catalog
sql
SHOW CATALOGS;
--查看当前的CATALOG
SHOW CURRENT CATALOG;
2.5 使用指定Catalog
sql
USE CATALOG my_jdbc_catalog;
--查看当前的CATALOG
SHOW CURRENT CATALOG;
3. HiveCatalog
3.1 上传所需jar包到lib下
sh
cp flink-sql-connector-hive-3.1.3_2.12-1.17.2.jar /opt/module/flink-1.17.2/lib/
cp mysql-connector-j-8.0.31.jar /opt/module/flink-1.17.2/lib/
3.2 重启flink集群和sql-client
3.3 启动外置的hive metastore服务
3.4 创建Catalog
sql
CREATE CATALOG myhive WITH (
'type' = 'hive',
'hive-version'='3.1.3',
'default-database' = 'default',
'hive-conf-dir' = 'hdfs://hadoop102:8020/conf/hive-3.1.3'
);
配置项 | 必需 | 默认值 | 类型 | 说明 |
---|---|---|---|---|
type | Yes | (none) | String | Catalog类型,创建HiveCatalog时必须设置为'hive'。 |
name | Yes | (none) | String | Catalog的唯一名称 |
hive-conf-dir | No | (none) | String | 包含hive -site.xml的目录,需要Hadoop文件系统支持。 如果没指定hdfs协议,则认为是本地文件系统。 如果不指定该选项,则在类路径中搜索hive-site.xml。 |
default-database | No | default | String | Hive Catalog使用的默认数据库 |
hive-version | No | (none) | String | HiveCatalog能够自动检测正在使用的Hive版本。 建议不要指定Hive版本,除非自动检测失败。 |
hadoop-conf-dir | No | (none) | String | Hadoop conf目录的路径。只支持本地文件系统路径。 设置Hadoop conf的推荐方法是通过HADOOP_CONF_DIR环境变量。 只有当环境变量不适合你时才使用该选项, 例如如果你想分别配置每个HiveCatalog。 |
3.5 查看Catalog
sql
SHOW CATALOGS;
--查看当前的CATALOG
SHOW CURRENT CATALOG;
3.6 使用指定Catalog
sql
USE CATALOG myhive;
--查看当前的CATALOG
SHOW CURRENT CATALOG;
运行结果:
3.7 在hiveCatalog中建表
sql
CREATE TABLE t3( id int, ts bigint , vc int )
WITH (
'connector'='filesystem',
'path'='hdfs://hadoop102:8020/warehouse/t3',
'format'='csv'
);
t3是之前我们在flink中建的表,里面是有数据的: