Impala的核心组件是Impalad,提供查询服务,catalogd缓存和获取元数据,statestored则负责把元数据更新到每个impalad节点上。
也就是说Impala集群会缓存全部的元数据,这就导致通过Hive更新元数据或者数据对于Impala是无感知的,例如通过hive建表或者拷贝新的数据到HDFS上等等。
因此Impala提供了两种机制来实现元数据的更新,分别是INVALIDATE METADATA和REFRESH。
INVALIDATE METADATA
用法:
invalidate metadata; -- invalidate所有表的元数据 invalidate metadata [table]; -- invalidate table的元数据
执行 invalidate metadata table
语句后,查询提交到impalad,大致经过以下流程:
1.获取table,向catalogd发起resetMetadata请求;
2.catalogd收到该请求,执行invalidateTable操作,清除table相关的元数据缓存,重新读取Metastore中的所有元数据,并生成新的缓存。新生成的缓存只包含表名+库名的信息,这时会为新生成的表对象生成一个新的catalog版本号,将这部分信息返回给impalad,然后异步执行元数据和数据的加载。
3.impalad收到catalogd的返回值,返回值是更新之后的表缓存对象+版本号,但是这是一个不完整的表元数据,impalad将这个元数据应用到本地元数据缓存。
REFRESH
用法:
refresh [table]; -- 刷新表table的元数据 refresh [table] partition [partition]; -- 刷新表table的partition分区元数据
在客户端上执行 refresh table
语句后,impalad会发生如下的动作:
1.impalad获取到表table,对catalogd发起resetMetadata请求;
2.catalogd收到该请求:对指定了partition的请求,执行reloadPartition操作,获取该分区最新元数据并刷新;对未指定partition的请求,执行reloadTable操作,获取全部分区最新的元数据并刷新。这里的"刷新"是指Metastore中与缓存进行对比,如果没有变化,就不做任何动作;如果有增删改,才会发生改变;
3.impalad收到catalogd返回的完整缓存,用它来更新本地缓存。
因此可以得到以下结论:
- 增删表或改变表结构,如
create table
、drop table
、alter table add column
等,需要使用invalidate metadata[table]
语句。 - 如果某表加入了新数据,或者有分区的改动,如
load data
、alter table add partition
等,需要使用refresh[table](partition[partition])
语句。 - invalidate metadata会使得impalad之间查询不一致更严重,因此建议禁止使用不带表名的invalidate metadata语句。
- 如果涉及到非常大批量的元数据更改,建议直接重启catalogd和statestored
SYNC_DDL
在以前的CDH版本中impala需要手动刷新元数据,从CDP7.1.1开始,我们可以看到impala打开了自动metadata同步
但是invalidate和refresh针对的是与impala对接的系统更新元数据,impala无法感知的问题,除此之外,impala自身执行DDL时也可能遇到数据不同步的问题。
在某个节点的impala-shell运行DDL语句,可能会出现其他节点查看不到的情况,因为impala的所有元数据都是用catalogd来管理的。一个impalad进行DDL操作会发送到catalogd,由catalogd在广播给其他的impalad服务,这时候可能会出现延迟,导致有的impalad查询不到发生的DDL。
我遇到的情况是在impala-shell中连续执行drop和create语句,有时会出现Table already exists的错误,如下:
[root@cdh1 ~]<20200909 14:33:56># impala-shell -i cdh2.maccro.com -d default -k -f test.sql Starting Impala Shell using Kerberos authentication Using service name 'impala' Warning: live_progress only applies to interactive shell sessions, and is being skipped for now. Opened TCP connection to cdh2.maccro.com:21000 Connected to cdh2.maccro.com:21000 Server version: impalad version 3.4.0-SNAPSHOT RELEASE (build d17bc2168f4252290795e744e36ba1c0326f47ac) Query: use `default` Query: use `default` Query: DROP TABLE IF EXISTS TEMP.DAILY_SUM_Tmp +-------------------------+ | summary | +-------------------------+ | Table has been dropped. | +-------------------------+ Fetched 1 row(s) in 2.86s Query: CREATE TABLE TEMP.DAILY_SUM_Tmp AS SELECT t.B_CD ,t.P_ID ,t.C_ID ,t.C_CD ,t.A_CD ,SUM(CASE WHEN t.Db_Ind='1' THEN t.Txn_A ELSE 0 END) AS C_Oy ,SUM(CASE WHEN t.Db_Ind='1' THEN 0 ELSE t.Txn_A END) AS C_Oy ,SUM(CASE WHEN t.Db_Ind='1' THEN t.Txn_A ELSE 0 END) AS C_Dy ,SUM(CASE WHEN t.Db_Ind='1' THEN 0 ELSE t.Txn_A END) AS C_Dy ,SUM(CASE WHEN t.Db_Ind='1' THEN t.Txn_Q ELSE 0 END) AS C_Qy ,SUM(CASE WHEN t.Db_Ind='1' THEN 0 ELSE t.Txn_Q END) AS C_r_Qy FROM DATA.FI t WHERE t.B_Dt = '20200901' GROUP BY t.B_CD, t.P_ID, t.C_ID, t.C_CD, t.A_CD ERROR: AnalysisException: Table already exists: TEMP.DAILY_SUM_Tmp Could not execute command: CREATE TABLE TEMP.DAILY_SUM_Tmp AS SELECT t.B_CD ,t.P_ID ,t.C_ID ,t.C_CD ,t.A_CD ,SUM(CASE WHEN t.Db_Ind='1' THEN t.Txn_A ELSE 0 END) AS C_Oy ,SUM(CASE WHEN t.Db_Ind='1' THEN 0 ELSE t.Txn_A END) AS C_Oy ,SUM(CASE WHEN t.Db_Ind='1' THEN t.Txn_A ELSE 0 END) AS C_Dy ,SUM(CASE WHEN t.Db_Ind='1' THEN 0 ELSE t.Txn_A END) AS C_Dy ,SUM(CASE WHEN t.Db_Ind='1' THEN t.Txn_Q ELSE 0 END) AS C_Qy ,SUM(CASE WHEN t.Db_Ind='1' THEN 0 ELSE t.Txn_Q END) AS C_r_Qy FROM DATA.FI t WHERE t.B_Dt = '20200901' GROUP BY t.B_CD, t.P_ID, t.C_ID, t.C_CD, t.A_CD [root@cdh1 ~]<20200909 14:34:18># impala-shell -i cdh2.maccro.com -d default -k -f test.sql Starting Impala Shell using Kerberos authentication Using service name 'impala' Warning: live_progress only applies to interactive shell sessions, and is being skipped for now. Opened TCP connection to cdh2.maccro.com:21000 Connected to cdh2.maccro.com:21000 Server version: impalad version 3.4.0-SNAPSHOT RELEASE (build d17bc2168f4252290795e744e36ba1c0326f47ac) Query: use `default` Query: use `default` Query: DROP TABLE IF EXISTS TEMP.DAILY_SUM_Tmp +-----------------------+ | summary | +-----------------------+ | Table does not exist. | +-----------------------+ Fetched 1 row(s) in 0.01s Query: CREATE TABLE TEMP.DAILY_SUM_Tmp AS SELECT t.B_CD ,t.P_ID ,t.C_ID ,t.C_CD ,t.A_CD ,SUM(CASE WHEN t.Db_Ind='1' THEN t.Txn_A ELSE 0 END) AS C_Oy ,SUM(CASE WHEN t.Db_Ind='1' THEN 0 ELSE t.Txn_A END) AS C_Oy ,SUM(CASE WHEN t.Db_Ind='1' THEN t.Txn_A ELSE 0 END) AS C_Dy ,SUM(CASE WHEN t.Db_Ind='1' THEN 0 ELSE t.Txn_A END) AS C_Dy ,SUM(CASE WHEN t.Db_Ind='1' THEN t.Txn_Q ELSE 0 END) AS C_Qy ,SUM(CASE WHEN t.Db_Ind='1' THEN 0 ELSE t.Txn_Q END) AS C_r_Qy FROM DATA.FI t WHERE t.B_Dt = '20200901' GROUP BY t.B_CD, t.P_ID, t.C_ID, t.C_CD, t.A_CD ERROR: AnalysisException: Table already exists: TEMP.DAILY_SUM_Tmp Could not execute command: CREATE TABLE TEMP.DAILY_SUM_Tmp AS SELECT t.B_CD ,t.P_ID ,t.C_ID ,t.C_CD ,t.A_CD ,SUM(CASE WHEN t.Db_Ind='1' THEN t.Txn_A ELSE 0 END) AS C_Oy ,SUM(CASE WHEN t.Db_Ind='1' THEN 0 ELSE t.Txn_A END) AS C_Oy ,SUM(CASE WHEN t.Db_Ind='1' THEN t.Txn_A ELSE 0 END) AS C_Dy ,SUM(CASE WHEN t.Db_Ind='1' THEN 0 ELSE t.Txn_A END) AS C_Dy ,SUM(CASE WHEN t.Db_Ind='1' THEN t.Txn_Q ELSE 0 END) AS C_Qy ,SUM(CASE WHEN t.Db_Ind='1' THEN 0 ELSE t.Txn_Q END) AS C_r_Qy FROM DATA.FI t WHERE t.B_Dt = '20200901' GROUP BY t.B_CD, t.P_ID, t.C_ID, t.C_CD, t.A_CD
这个时候在所有语句前,加上 setSYNC_DDL=1;
让DDL语句同步到所有impalad节点,便可以解决这个问题