presto、impala、kudu相关优化整理

简介: presto、impala、kudu相关优化整理

prsto优化


presto webui各项指标的含义

Presto 优化

数据存储

1 合理设置分区

与 Hive 类似,Presto 会根据元数据信息读取分区数据,合理地设置分区能减少 Presto 数据读取量,提升查询性能。

2 使用 ORC 格式存储

Presto 对 ORC文件 读取进行了特定优化,因此,在 Hive 中创建 Presto 使用的表时,建议采用 ORC 格式存储。相对于 Parquet 格式,Presto 对 ORC 格式支持得更好。

3 使用压缩

数据压缩可以减少节点间数据传输对 IO 带宽的压力,对于即席查询需要快速解压,建议采用 Snappy压缩。

4 预先排序

对于已经排序的数据,在查询的数据过滤阶段,ORC格式支持跳过读取不必要的数据。比如对于经常需要过滤的字段可以预先排序。

INSERT INTO table nation_orc partition(p) SELECT * FROM nation SORT BY n_name;

如果需要过滤 n_name 字段,则性能将提升。

SELECT count(*) FROM nation_orc WHERE n_name=’AUSTRALIA’;

SQL查询

1 只选择需要的字段

由于采用列式存储,所以只选择需要的字段可加快字段的读取速度,减少数据量。避免采用 * 读取所有字段。

2 过滤条件必须加上分区字段

对于有分区的表,where语句中优先使用分区字段进行过滤。acct_day 是分区字段,visit_time 是具体访问时间。

[GOOD]: SELECT time,user,host FROM tbl where acct_day=20171101
[BAD]:  SELECT * FROM tbl where visit_time=20171101

3 Group By语句优化

合理安排 Group by语句中字段顺序对性能有一定提升。将 Group By 语句中字段按照每个字段 distinct 数据多少进行降序排列。

[GOOD]: SELECT GROUP BY uid, gender
[BAD]:  SELECT GROUP BY gender, uid

4 Order by时使用Limit

Order by 需要扫描数据到单个 worker 节点进行排序,导致单个worker需要大量内存。如果是查询 Top N 或者 Bottom N,使用 limit 可减少排序计算和内存压力

[GOOD]: SELECT * FROM tbl ORDER BY time LIMIT 100
[BAD]:  SELECT * FROM tbl ORDER BY time

5 使用近似聚合函数

Presto有一些近似聚合函数,对于允许有少量误差的查询场景,使用这些函数对查询性能有大幅提升。比如使用approx_distinct()函数比Count(distinct x)有大概2.3%的误差。

SELECT approx_distinct(user_id) FROM access

6 用regexp_like代替多个like语句

Presto查询优化器没有对多个 like 语句进行优化,使用regexp_like对性能有较大提升。

[GOOD]
SELECT
  ...
FROM
  access
WHERE
  regexp_like(method, 'GET|POST|PUT|DELETE')
[BAD]
SELECT
  ...
FROM
  access
WHERE
  method LIKE '%GET%' OR
  method LIKE '%POST%' OR
  method LIKE '%PUT%' OR
  method LIKE '%DELETE%'

7 使用Join语句时将大表放在左边

Presto中 join 的默认算法是broadcast join,即将 join 左边的表分割到多个 worker ,然后将join 右边的表数据整个复制一份发送到每个worker进行计算。如果右边的表数据量太大,则可能会报内存溢出错误。

[GOOD] SELECT ... FROM large_table l join small_table s on l.id = s.id
[BAD] SELECT ... FROM small_table s join large_table l on l.id = s.id

8 使用Rank函数代替row_number函数来获取Top N

在进行一些分组排序场景时,使用rank函数性能更好

[GOOD]
SELECT checksum(rnk)
FROM (
  SELECT rank() OVER (PARTITION BY l_orderkey, l_partkey ORDER BY l_shipdate DESC) AS rnk
  FROM lineitem
) t
WHERE rnk = 1
[BAD]
SELECT checksum(rnk)
FROM (
  SELECT row_number() OVER (PARTITION BY l_orderkey, l_partkey ORDER BY l_shipdate DESC) AS rnk
  FROM lineitem
) t
WHERE rnk = 1

9 时间函数

对于Timestamp,需要进行比较的时候,需要添加Timestamp关键字,而MySQL中对Timestamp可以直接进行比较。

/*MySQL的写法*/
SELECT t FROM a WHERE t > '2017-01-01 00:00:00'; 
/*Presto中的写法*/
SELECT t FROM a WHERE t > timestamp '2017-01-01 00:00:00';

join优化

1:使用Join语句时将大表放在左边

Presto中join的默认算法是broadcast join,即将join左边的表分割到多个worker,然后将join右边的表数据整个复制一份发送到每个worker进行计算。

如果右边的表数据量太大,则可能会报内存溢出错误。

2:如果左表和右表都比较大

为防止内存溢出,做如下配置:

1)修改配置distributed-joins-enabled (presto version >=0.196)

2)在每次查询开始使用distributed_join的session选项


set session distributed_join = 'true' SELECT ... FROM large_table1 join large_table2 on large_table1.id = large_table2.id

核心点就是使用distributed join,也就是hash join。

Presto的这种配置类型会将左表和右表同时以join key的hash value为分区字段进行分区。

所以即使右表也是大表,也会被拆分,相比broadcast join,这种join方式的会增加很多网络数据传输,效率慢。

3:多个join的OR条件使用union代替

SELECT ... FROM t1 JOIN t2 ON t1.a1 = t2.a1 ORt1.a2 = t2.a2 改为 SELECT ... FROM t1 JOIN t2 ON t1.a1 = t2.a1 union SELECT ... FROM t1 JOIN t2 ON t1.a2 = t2.a2

4:使用WITH语句

使用Presto分析统计数据时,可考虑把多次查询合并为一次查询,用Presto提供的子查询完成。

WITH tmp AS ( SELECT DISTINCT a1, a2 FROM t2) SELECT ... FROM t1 JOIN tmp ON t1.a1 = tmp.a1 union SELECT ... FROM t1 JOIN tmp ON t1.a2 = tmp.a2;

5:尽量用UNION ALL代替UNION

和distinct类似, UNION有去重的功能, 所以会使用到内存,如果只是拼接两个或者多个SQL查询的结果, 考虑用UNION ALL

6: 利用子查询,减少读表的次数,尤其是大数据量的表

具体做法是,将使用频繁的表作为一个子查询抽离出来,避免多次 read。

注意事项

ORCParquet 都支持列式存储,但是ORC对Presto支持更好(Parquet对Impala支持更好)

对于列式存储而言,存储文件为二进制的,对于经常增删字段的表,建议不要使用列式存储(修改文件元数据代价大)。对比数据仓库,dwd层建议不要使用ORC,而dm层则建议使用


 


impala优化


1  选择适当的数据文件格式

使用impala,无非就是为了一个目的:性能好/资源消耗少,Impala为了做到通用性,也就是为了更好的hive无缝连接,支持了大部分Hive支持的文件格式,例如Text、Avro、RCFile、Parquet等(不支持ORC),但是为了实现更快的ad-hoc查询(基本上都是OLAP查询,查询部分列,聚合,分析),我们基本上都会选择使用Parquet格式作为数据文件存储格式。请选用Parquet作为文件存储格式。


2  避免摄取过程产生很多年小的文件

如果有其他程序产生的小文件,可以使用中间表,将小文件数据存放到中间表。然后通过insert…select…方式中间表的数据插入到最终表中


3  使用合适的分区技术,根据分区粒度测算

分区的个数通常是根据业务数据来的,通常时间分区(例如日期/月份)是少不了的,例如对于一个支持多终端的应用,可能在时间分区下面再加一层终端类型的分区,设置对于每一个终端的不同操作在进行一层分区,根据唯物辩证法,凡事都需要保持一个度,那么就从两个极端的情况下来分析分区的粒度如何确定:1:分区过少:,整个表不使用分区,或者只有一个日期的分区,这样会导致频繁的查询某一个终端的数据不得不扫描整天的数据甚至整个表的数据,这是一种浪费;2、分区过多,对于每一个要统计的维度都创建一个分区,这样对于任何一个维度=’xxx’的查询都只需要扫描精确需要的数据,但是这样会导致大量的数据目录,进而导致大量的文件需要扫描,这对于查询优化器是一个灾难。因此最终的建议是:根据查询需求确定分区的粒度,根据每一个分区的成员个数预估总的分区数,保证一个表的分区数不超过30000(经验之谈?),避免过小的分区。

在决定使用哪个列(S)进行分区时,选择合适的粒度级别。例如,你应该分配一年,一个月,一天,或只有一年和一个月?选择一个分区策略,提出至少256 MB的每个分区中的数据,利用HDFS块I/O和Impala分布式查询。

在分区也会导致查询计划需要更长的时间比必要的,如Impala修剪不必要的分区。理想情况下,保持表中的分区的数量低于3万。

准备数据文件时,要在分区目录中,创建几个大文件,而不是许多小文件。如果您以许多小文件的形式接收数据,并没有对输入格式的控制,请考虑使用“插入”…选择语法复制数据从一个表或分区到另一个,它压缩的文件到一个相对小的数量(基于集群中的节点数)。

如果你需要减少分区的整体数量和增加在每个分区的数据量,分区键列,很少引用或非关键查询引用第一次看(不受SLA)。例如,您的网站日志数据可能会被年、月、日和小时划分,但如果大多数查询每天都将结果卷起来,也许你只需要一年、一个月、一天的时间来划分。

如果您需要更大的减少粒度,请考虑创建“桶”,计算对应于不同的分区键的值的计算值。例如,您可以使用trunc()功能与时间戳列组的日期和时间间隔的基础上如周或季度值。看到Impala Date和时间函数的详细信息。

4  使用最小的适当的整数类型进行分区键列

      虽然它是使用字符串的分区键列很吸引人,因为这些值可以转化为HDFS的目录名称,但是不管怎样,你可以用普通分区的关键领域如年月数值最小化内存使用,和天。使用最小的整数类型,认为适当的范围值,通常TINYINT为年月日、年和smallint。使用extract()功能拉出个别日期和时间字段的时间戳值,和cast()返回值到相应的整数类型。

5  选择一个合适的数据块大小

默认情况下,插入的Impala…选择语句创建一个256MB的文件块大小的block。(Impala2开始更改变更为默认。从前,限为1 GB,但是Impala关于压缩的保守估计,导致小于1 GB的文件。)


      每个parquet文件写的Impala是一个块,让整个文件是由一个单一的主机单元处理。当你拷贝文件到HDFS地板之间或HDFS文件系统,使用hdfs dfs -pb保持原块的大小。


      如果只有一个或几个数据块中的表,或在一个分区,是唯一的查询访问,那么你可能会经历一个不同的理由放缓:没有足够的数据来利用Impala的并行分布式查询。每个数据块的一个重要组成部分,由一个单一的核心处理。在16个核心机器的100个节点集群中,您可能同时处理数千个数据文件。您要在“多个小文件”和“单个巨型文件”之间找到一个“小文件”和“单个巨文件”之间的一个“小文件”和“并行处理”之间的“小文件”和“并行处理”。你可以设置parquet_file_size查询选项在插入之前…SELECT语句来减少每个生成的parquet文件的大小。(指定文件大小的字节,一个绝对的数量或在Impala2后,在结束与M为千兆字节。兆字节或G单位)使用不同的文件大小的基准,找到适合您的特定数据量正确的平衡点。

6  尽量将StateStore和Catalog单独部署到同一个节点,保证他们正常通信。

7  通过对Impala Daemon内存限制(默认256M)及StateStore工作线程数,来提高Impala的执行效率。
8  收集性能关键或高容量连接查询中使用的所有表的统计数据

使用 compute stats进行表信息搜集,当一个内容表或分区明显变化,重新计算统计相关数据表或分区。因为行和不同值的数量差异可能导致impala选择不同的连接顺序时进行查询。

9  尽量减少将结果发送给客户端的开销

使用技术,如:

      汇聚。如果你需要知道一个条件匹配多少行,从一些列的总值中匹配值,最高或最低的匹配,等等,叫汇总等功能count(),sum(),并在查询结果集的max()而不是发送到应用程序和做这些计算有。记住,一个不汇聚的结果集是巨大的,通过网络传输需要大量的时间。


       过滤。在查询的其中一个子句中使用所有适用的测试,以消除不相关的行,而不是产生一个大的结果集,并使用应用程序逻辑进行过滤。

       限制条款。如果您只需要从结果集上看到一些示例值,或从查询使用顺序的顶部或底部值,包括限制条款,以减少结果集的大小,而不是要求充分的结果集。最好的就是丢掉大多数的行。

       避免过度开销。由于漂亮的打印结果并显示在屏幕上。当你检索结果通过使用Impala的外壳,外壳等选项,B和output_delimiter产生结果没有特殊的格式,并将输出重定向到文件中而不是打印到屏幕上。考虑使用插入…选择写的结果直接向HDFS的新文件。看到Impala外壳配置选项的Impala shell的命令行选项的详细信息。

10  网络io的优化:

–a.避免把整个数据发送到客户端

–b.尽可能的做条件过滤

–c.使用limit字句

–d.输出文件时,避免使用美化输出

11  使用profile输出底层信息计划,在做相应环境优化

12  减少返回结果大小

如果需要统计聚合,直接在SQL中完成,尽可能的在where中执行过滤而不要查出来之后在应用端做过滤,对于查询结果尽可能使用LIMIT限制返回结果集大小;避免大量的结果展示在终端,可以考虑通过INSERT xxx的方式把结果输出到文件,或者通过impala-shell参数将结果重定向。

13  SQL优化,使用之前调用执行计划

1,执行SQL前对SQL进行一个分析,使用explain sql(分析哪个步骤需要内存多,指定机器),profile(为什么这次的SQL执行的这么慢)
2,哪些SQL会导致数据倾斜,保证谓词下推的成功
3,多层嵌套,select * 都是可以优化的

参考链接:https://blog.csdn.net/javastart/article/details/96335278

               https://blog.csdn.net/maenlai0086/article/details/89956989

 

impala+kudu踩坑


 

  • 一开始需要全量导入kudu,这时候我们先用sqoop把关系数据库数据导入临时表,再用impala从临时表导入kudu目标表

由于sqoop从关系型数据直接以parquet格式导入hive会有问题,这里默认hive的表都是text格式;每次导完到临时表,需要做invalidate metadata 表操作,不然后面直接导入kudu的时候会查不到数据.

  • 除了查询,建议所有impala操作都在impala-shell而不在hue上面执行
  • impala并发写入kudu的时候,数据量比较大的时候

这时候kudu配置参数 --memory_limit_hard_bytes能大点就大点,因为kudu写入首先保存再内存里面,到一定阀值才溢写到磁盘,这个是直接最能提高写的方法;

当然不是所有机器都有那么多资源,可以把--maintenance_manager_num_threads 这个参数稍微调大,需要调试,提高数据从内存写入磁盘的效率

  • impala查询kudu

首先所有表做完全量的etl操作,必须得执行compute stats 表名,不然impala执行sql生成的计划执行数评估的内存不准确,容易评估错误导致实际执行不了

kudu表最好不要做任何压缩,保证原始扫描性能发挥最好;假如对查询性能要求比存储要求高的话;大部分企业对实时查询效率要求高,而且存储成本毕竟低;

kudu针对大表要做好分区,最好range和hash一起使用,前提是主键列包含能hash的id,但range分区一定要做好,经验告诉我一般是基于时间;

查询慢的sql,一般要拿出来;方便的话做下explain,看下kudu有没有过滤部分数据关键字kudu predicates;假如sql没问题,那在impala-shell执行这个sql,最后执行summray命令,重点查看单点峰值内存和时间比较大的点,对相关的表做优化,解决数据倾斜问题

  • kudu数据删除

大表不要delete,不要犹豫直接drop,在create吧;磁盘空间会释放的

  • 关于impala + kudu 和 impala + parquet

网上很多分析impala + kudu 要比 impala + parquet 优越很多;谁信谁XB;

首先两个解决的场景不一样,kudu一般解决实时,hive解决的是离线(通常是T + 1或者 T -1)

hive基于hdfs,hdfs已经提供一套较为完善的存储机制,底层数据和文件操作便利;安全性,可扩展性都比kudu强很多,最重要parquet + impala效率要比kudu高,数仓首选是它

kudu最大优势是能做类似关系型数据库一样的操作,insert, update, delete,这样热点的数据可以存储在kudu里面并随时做更新

  • 最后谈到的实时同步工具

同步工具我们这里使用streamsets,一个拖拉拽的工具,非常好用;但内存使用率高,通过jconsole我们发现,所有任务同时启动;JVM新生代的内容几乎都跑到老年代了,GC没来的及,就内存溢出了;后面单独拿几台服务器出来做这个ETL工具,jvm配置G1垃圾回收器


 


kudu性能优化


 

硬件层面优化

tserver的WAL采用M.2接口(NVMe协议) SSD,Kudu的每一次写入都会先写WAL,WAL是确保数据不丢失的关键,所以一般都会同步写磁盘(顺序写),为了提高性能建议tserver采用M.2接口(NVMe协议)SSD来存储WAL,至少也得是普通SD(master读写压力小,跟操作系统共享SSD即可)

–fs_wal_dir=/data/kudu/tserver/wal


数据存储多SSD

tserver负责数据的读写和复制,压力比较大,建议采用多SSD分散读写IO。

fs_data_dirs=/disk1/kudu/tserver/data,/disk2/kudu/tserver/data,/disk3/kudu/tserver/data


操作系统层面优化

操作系统会控制每个用户使用的文件描述符和线程数,Kudu作为数据库肯定比一般应用需要更多文件描述符和线程数

如果Kudu使用的线程数超过OS的限制,你会在日志中看到如下报错:

pthread_create failed: Resource temporarily unavailable

降低或者禁用swap使用交换区会导致性能下降,建议降低swap的使用

sudo su -

echo ‘vm.swappiness=10’>> /etc/sysctl.conf

exit

上面参数重启才能生效,可以同时搭配如下命令避免重启:

sudo sysctl vm.swappiness=10

cat /proc/sys/vm/swappiness

检查当前是否生效

cat /proc/sys/vm/swappiness


配置调优

tserver内存限制

Tablet Server能使用的最大内存量,tablet Server在批量写入数据时并非实时写入磁盘,而是先Cache在内存中,在flush到磁盘。这个值设置过小时,会造成Kudu数据写入性能显著下降。对于写入性能要求比较高的集群,建议设置更大的值 :

–memory_limit_hard_bytes

还有两个软限制:

Cgroup 内存软限制,这个限制并不会阻止进程使用超过限额的内存,只是在系统内存不足时,会优先回收超过限额的进程占用的内存,使之向限定值靠拢,当进程试图占用的内存超过了cgroups的限制,会触发out of memory,导致进程被kill掉

–memory_limit_soft_percentage=80


tserver维护管理器线程数

Kudu后台对数据进行维护操作,如写入数据时的并发线程数,一般设置为4,建议的是数据目录的3倍

–maintenance_manager_num_threads=6

调大tserver block cache容量,分配给Kudu Tablet Server块缓存的最大内存量,建议是2-4G

–block_cache_capacity_mb=2048

避免磁盘耗尽,为避免磁盘空间耗尽,应该保留一部分空间:#默认-1,表示保留1%的磁盘空间,自己配置是必须大于0

–fs_data_dirs_reserved_bytes

容忍磁盘故障

如果某个tablet的数据分散到更多的磁盘,则数据会更加分散,这个值越小每个tablet的数据会更加集中,不过受磁盘故障影响就越小。

#每个tablet的数据分散到几个目录
fs_target_data_dirs_per_tablet=3

ntp时钟同步误差参数:

设置ntp服务器的时间误差不超过20s(默认是10s)

max_clock_sync_error_usec=20000000

参考链接:

https://fiend.blog.csdn.net/article/details/105946491


sparksql


 

  • 标准优化规则
  • 过滤推断前的算子优化-operatorOptimizationRuleSet
  • 过滤推断-Infer Filters
  • 过滤推断后的算子优化-operatorOptimizationRuleSet
  • 下推join的额外谓词-Push extra predicate through join
  • 算子下推(Operator push down)-Project、Join、Limit、列剪裁
  • 算子合并(Operator combine)-Repartition、Project、Window、Filter、Limit、Union
  • 常量折叠和强度消减(Constant folding and strength reduction)-Repartition、Window、Null、常量、In、Filter、整数类型、Like、Boolean、if/case、二义性、no-op、struct、取值操作(struct/array/map)、csv/json、Concat
  • analysis 阶段的收尾规则-Finish Analysis,比如EliminateSubqueryAliases实际是在Analyzer里定义的
  • 算子优化前-Union、Limit、数据库关系、子查询、算子的替代、聚合算子
  • 算子优化-operatorOptimizationBatch
  • 依赖统计数据的优化规则-Project、Filter、Join、Sort、Decimal、Aggregate、对象表达式、数据库关系、笛卡尔积、子查询、Float、Struct
  • 其他特殊的优化规则-分区元数据、DPP(动态分区裁剪)、Filter、Python UDF以及用户自定义的优化规则

sparksql相关参数:

//1.下列Hive参数对Spark同样起作用。
set hive.exec.dynamic.partition=true; // 是否允许动态生成分区
set hive.exec.dynamic.partition.mode=nonstrict; // 是否容忍指定分区全部动态生成
set hive.exec.max.dynamic.partitions = 100; // 动态生成的最多分区数
//2.运行行为
set spark.sql.autoBroadcastJoinThreshold; // 大表 JOIN 小表,小表做广播的阈值
set spark.dynamicAllocation.enabled; // 开启动态资源分配
set spark.dynamicAllocation.maxExecutors; //开启动态资源分配后,最多可分配的Executor数
set spark.dynamicAllocation.minExecutors; //开启动态资源分配后,最少可分配的Executor数
set spark.sql.shuffle.partitions; // 需要shuffle是mapper端写出的partition个数
set spark.sql.adaptive.enabled; // 是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize; //开启spark.sql.adaptive.enabled后,两个partition的和低于该阈值会合并到一个reducer
set spark.sql.adaptive.minNumPostShufflePartitions; // 开启spark.sql.adaptive.enabled后,最小的分区数
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize; //当几个stripe的大小大于该值时,会合并到一个task中处理
//3.executor能力
set spark.executor.memory; // executor用于缓存数据、代码执行的堆内存以及JVM运行时需要的内存
set spark.yarn.executor.memoryOverhead; //Spark运行还需要一些堆外内存,直接向系统申请,如数据传输时的netty等。
set spark.sql.windowExec.buffer.spill.threshold; //当用户的SQL中包含窗口函数时,并不会把一个窗口中的所有数据全部读进内存,而是维护一个缓存池,当池中的数据条数大于该参数表示的阈值时,spark将数据写到磁盘
set spark.executor.cores; //单个executor上可以同时运行的task数

相关优化详见:

https://zhuanlan.zhihu.com/p/558500455
https://cloud.tencent.com/developer/article/2019623
相关文章
|
SQL 分布式计算 Hadoop
Hive使用Impala组件查询(1)
Hive使用Impala组件查询(1)
458 0
|
SQL 存储 Java
Hive使用Impala组件查询(2)
Hive使用Impala组件查询(2)
155 0
|
SQL 存储 分布式计算
Impala 架构了解
Impala 架构了解
Impala 架构了解
|
SQL 分布式计算 Java
KuduSpark_Impala 访问 Kudu | 学习笔记
快速学习 KuduSpark_Impala 访问 Kudu
320 0
KuduSpark_Impala 访问 Kudu | 学习笔记
|
存储 SQL 数据挖掘
kudu原理_ Kudu是什么|学习笔记
快速学习kudu原理_ Kudu是什么
103 0
Impala——2.架构
标签(空格分隔): Impala Impala Server的组件 Impala服务器是分布式,大规模并行处理(MPP)数据库引擎。它由不同的在群集中的特定主机上运行的守护程序进程组成。 Impala守护进程 核心Impala组件是一个守护进程,它通过impalad进程在集群的每个DataNode上运行。
1744 0
|
SQL 分布式计算 Java
Hadoop大数据平台实战(02):HBase vs. Hive vs. Impala 对比
Hadoop大数据平台实战(02):HBase vs. Hive vs. Impala 对比。
2901 0
|
SQL 分布式计算 大数据
Hadoop大数据平台实战(01):Impala vs Hive的区别
Hadoop大数据生态系统重要的2个框架Apache Hive和Impala,用于在HDFS和HBase上进行大数据分析。 但Hive和Impala之间存在一些差异--Hadoop生态系统中的SQL分析引擎的竞争。本文中我们会来对比两种技术Impala vs Hive区别?
4296 0

热门文章

最新文章