Spark调优

以下的配置项都是配置在了 properties.conf 文件中,然后在submit程序时指定配置文件;或配置到默认配置文件 spark-defaults.conf 中。

bin/spark-submit --properties-file properties.conf Spark-1.0-SNAPSHOT.jar

一、CPU配置项

# 集群内满配cpu核数
spark.cores.max  
# 单个Executor内cpu核数,standalone模式默认会使用全部核
spark.executor.cores   
# 单个task计算任务消耗cpu核数,默认为1且不能小于1,大于1时是task任务为多线程的(大部分时候不必设置)
spark.task.cpus        

并行度参数如下(数据的并行度,将数据划分为多少块):

# 未指定分区数时RDD默认并行度
spark.default.parallelism    
# SparkSQL框架下,数据关联、聚合操作时Reducer 在shuffle reduce阶段默认的并行度
spark.sql.shuffle.partitions  

并发度基本由 spark.executor.cores 参数敲定,因为spark.task.cpus通常为1

并发度=任务数=(spark.executor.cores)/(spark.task.cpus)

Executor数=(spark.cores.max)/(spark.executor.cores)

# 在yarn集群中指定executor的个数
spark.executor.instances 

二、内存配置项

# 每个executor的内存绝对值大小,默认1g
spark.executor.memory 
# 除用户内存外的计算和存储内存所占比例,默认0.6
spark.memory.fraction
# 计算和和存储内存中存储内存所占比例,默认0.5
spark.memory.storageFraction

三、磁盘配置项

配置在spark-env.sh中,服务级别的配置

# spark暂存空间目录,存放map输出文件和RDDs,支持","分隔的多个目录。shuffle输出的文件
SPARK_LOCAL_DIRS=
# spark的worker工作目录,暂存空间存放全部日志,默认SPARK_HOME/work
SPARK_WORKER_DIR=

配置在spark-defaults.conf或sparkConf中,任务级别的配置,会被SPARK_LOCAL_DIRS设置的目录覆盖

# spark暂存空间目录,用来改善Shuffle中间文件存储,以及RDD Cache磁盘存储
spark.local.dir 目录

四、SparkSQL配置项

# broadcast join 广播阈值 ,默认10M,工业级应用往往设置2G左右
# 只要基表(尺寸较小的表)小于该配置项的设定值,Spark SQL就会自动选择 Broadcast Join 策略,无论是否显示
# 使用broadcast函数,并且为AQE机制奠定基础
spark.sql.autoBroadcastJoinThreshold 

AQE 的全称是 Adaptive Query Execution,翻译过来是“自适应查询执行”,Spark 社区在 3.0 版本中推出了 AQE 机制。AQE三个动态调整方向:Join 策略调整、自动分区合并和自动倾斜处理

# 是否启动AQE,默认false
spark.sql.adaptive.enabled true

五、cache

在Spark计算过程中善用cache会极大提高性能,对重复使用的数据建议添加cache,而对只使用一两次的数据不建议添加cache,否则不仅浪费内存空间而且会降低Spark运行效率

使用的建议:

  • 如果 RDD/DataFrame/Dataset 在应用中的引用次数为 1,就坚决不使用 Cache
  • 如果引用次数大于 1,且运行成本占比超过 30%,应当考虑启用 Cache

使用Cache有两种方式:

  1. 创建临时视图再cache
df.createTempView("table_name")
spark.sql("cache tabel table_name")
  1. cache算子
df.cache.count

cache操作时惰性操作,只有action算子时才触发计算

只有 count 才会触发缓存的完全物化,而 first、take 和 show 这 3 个算子只会把涉及的数据物化