一个计算任务的执行主要依赖于CPU、内存、带宽。Spark是一个基于内存的计算引擎,所以对它来说,影响最大的可能就是内存,一般我们的任务遇到了性能瓶颈大概率都是内存的问题,当然了CPU和带宽也可能会影响程序的性能,这个情况也不是没有的,只是比较少。
Spark性能优化,其实主要就是在于对内存的使用进行调优。通常情况下,如果你的Spark程序计算的数据量比较小,并且你的内存足够使用,那么只要网络不至于卡死,一般是不会有大的性能问题的。但是Spark程序的性能问题往往出现在针对大数据量进行计算(比如上亿条数的数据,或者上T规模的数据),这个时候如果内存分配不合理就会比较慢,所以,Spark性能优化,主要是对内存进行优化。
所以把原始文件中的数据转化为内存中的对象之后,占用的内存会比原始文件中的数据还要大。那我如何预估程序会消耗多少内存呢?通过cache方法,可以看到RDD中的数据cache到内存中之后占用多少内存,这样就能看出了
测试代码如下:
package com.simoniu.scalademo
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:测试内存占用情况
* Created by simoniu
*/
object TestMemoryScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("TestMemoryScala")
.setMaster("local")
val sc = new SparkContext(conf)
val dataRDD = sc.textFile("hdfs://master:9000/test/hello_10000000.dat").cache()
val count = dataRDD.count()
println(count)
//while循环是为了保证程序不结束,方便在本地查看4040页面中的storage信息
while(true){
;
}
}
}
执行代码,访问localhost的4040端口界面,这个界面其实就是spark的任务界面,在本地运行任务的话可以直接访问4040界面查看。
点击stages可以看到任务的原始输入数据是多大?

点击storage可以看到将数据加载到内存,生成RDD之后的大小。

这样我们就能知道这一份数据在RDD中会占用多少内存了,这样在使用的时候,如果想要把数据全部都加载进内存,就需要给这个任务分配这么多内存了,当然了你分配少一些也可以,只不过这样计算效率会变低,因为RDD中的部分数据内存放不下就会放到磁盘了。
下面我们通过这几个方式来实现对Spark程序的性能优化。
1).高性能序列化类库
在任何分布式系统中,序列化都是扮演着一个重要的角色的。如果使用的序列化技术,在执行序列化操作的时候很慢,或者是序列化后的数据还是很大,那么会让分布式应用程序的性能下降很多。所以,进行Spark性能优化的第一步,就是进行序列化的性能优化。Spark默认会在一些地方对数据进行序列化,如果我们的算子函数使用到了外部的数据(比如Java中的自定义类型),那么也需要让其可序列化,否则程序在执行的时候是会报错的,提示没有实现序列化。
注意了,其实遇到这种没有实现序列化的对象,解决方法有两种。
Spark默认情况下倾向于序列化的便捷性,使用了Java自身提供的序列化机制——基于ObjectInputStream和ObjectOutputStream的序列化机制,因为这种方式是Java原生提供的,使用起来比较方便,但是Java序列化机制的性能并不高。序列化的速度相对较慢,而且序列化以后的数据,相对来说还是比较大,比较占空间。所以,如果你的Spark应用程序对内存很敏感,那默认的Java序列化机制并不是最好的选择。
Spark实际上提供了两种序列化机制:Java序列化机制和Kryo序列化机制。Spark只是默认使用了java这种序列化机制。
Kryo序列化机制之所以不是默认序列化机制的原因有以下两点:
如果要使用Kryo序列化机制,基本步骤如下:
什么场景下适合使用Kryo序列化?
一般是针对一些自定义的对象,例如我们自己定义了一个对象,这个对象里面包含了几十M,或者上百M的数据,然后在算子函数内部,使用到了这个外部的大对象。如果默认情况下,让Spark用java序列化机制来序列化这种外部的大对象,那么就会导致序列化速度比较慢,并且序列化以后的数据还是比较大。所以,在这种情况下,比较适合使用Kryo序列化类库,来对外部的大对象进行序列化,提高序列化速度,减少序列化后的内存空间占用。
Kryo序列化案例,scala代码如下:
package com.simoniu.scalademo
import com.esotericsoftware.kryo.Kryo
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
* 需求:Kryo序列化的使用
* Created by simoniu
*/
object KryoSerScala {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("KryoSerScala")
.setMaster("local")
//指定使用kryo序列化机制,注意:如果使用了registerKryoClasses,其实这一行设置是可以省略的
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Person]))//注册自定义的数据类型
val sc = new SparkContext(conf)
val dataRDD = sc.parallelize(Array("hello linux","hello java","hello hadoop"))
val wordsRDD = dataRDD.flatMap(_.split(" "))
val personRDD = wordsRDD.map(word=>Person(word,18)).persist(StorageLevel.MEMORY_ONLY_SER)
personRDD.foreach(println(_))
//while循环是为了保证程序不结束,方便在本地查看4040页面中的storage信息
while (true) {
;
}
}
}
case class Person(name: String,age: Int) extends Serializable
执行任务,然后访问localhost的4040界面,在界面中可以看到cache的数据大小是54字节。

那我们把kryo序列化设置去掉,使用默认的java序列化看一下效果。

发现此时占用的内存空间是185字节,比使用kryo的方式内存空间多占用了将近4倍。
注意:如果我们只是将spark的序列化机制改为了kryo序列化,但是没有对使用到的自定义类型手工进行注册,那么此时内存的占用会介于前面两种情况之间。
java代码如下:
package com.simoniu.sparkdemo.javademo;
import com.esotericsoftware.kryo.Kryo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.serializer.KryoRegistrator;
import org.apache.spark.storage.StorageLevel;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
/**
* 需求:Kryo序列化的使用
* Created by simoniu
*/
public class KryoSerjava {
public static void main(String[] args) {
//创建SparkContext:
SparkConf conf = new SparkConf();
conf.setAppName("KryoSerjava")
.setMaster("local")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.classesToRegister", "com.simoniu.sparkdemo.javademo.Person");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> dataRDD = sc.parallelize(Arrays.asList("hello linux","hello java","hello hadoop"));
JavaRDD<String> wordsRDD = dataRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String line) throws Exception {
return Arrays.asList(line.split(" ")).iterator();
}
});
JavaRDD<Person> personRDD = wordsRDD.map(new Function<String, Person>() {
@Override
public Person call(String word) throws Exception {
return new Person(word, 18);
}
}).persist(StorageLevel.MEMORY_ONLY_SER());
personRDD.foreach(new VoidFunction<Person>() {
@Override
public void call(Person person) throws Exception {
System.out.println(person);
}
});
while (true){
;
}
}
}
class Person implements Serializable{
private String name;
private int age;
Person(String name,int age){
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
2).持久化或者checkpoint
针对程序中多次被transformation或者action操作的RDD进行持久化操作,避免对一个RDD反复进行计算,再进一步优化,使用Kryo序列化的持久化级别,减少内存占用。 为了保证RDD持久化数据在可能丢失的情况下还能实现高可靠,则需要对RDD执行Checkpoint操作。
这两个操作我们前面讲过了,这里不再赘述。
3).JVM垃圾回收调优
由于Spark是基于内存的计算引擎,RDD缓存的数据,以及算子执行期间创建的对象都是放在内存中的,所以针对Spark任务如果内存设置不合理会导致大部分时间都消耗在垃圾回收上。
默认情况下,Spark使用每个executor 60%的内存空间来缓存RDD,那么只有40%的内存空间来存放算子执行期间创建的对象。在这种情况下,可能由于内存空间的不足,并且算子对应的task任务在运行时创建的对象过大,那么一旦发现40%的内存空间不够用了,就会触发Java虚拟机的垃圾回收操作。因此在极端情况下,垃圾回收操作可能会被频繁触发。
在这种情况下,如果发现垃圾回收频繁发生。那么就需要对这个比例进行调优了,spark.storage.memoryFraction参数的值默认是0.6。使用SparkConf().set("spark.storage.memoryFraction", "0.5")可以进行修改,就是将RDD缓存占用内存空间的比例降低为50%,从而提供更多的内存空间来保存task运行时创建的对象。
我们可以对task的垃圾回收进行监测,在spark的任务执行界面,可以查看每个task执行消耗的时间,以及task gc消耗的时间。查看task的执行情况时,我们看这里面的GC time的数值会不会比较大,最直观的就是如果gc time这里标红了,则说明gc时间过长。

上面这个是分任务查看,其实还可以查看全局的,看Executor进程中整个任务执行总时间和gc的消耗时间。

Java的堆空间被划分成了两块空间:一个是年轻代,一个是老年代。 - 年轻代放的是短时间存活的对象 - 老年代放的是长时间存活的对象。 - 年轻代又被划分了三块空间,Eden、Survivor1、Survivor2

年轻代占堆内存的1/3,老年代占堆内存的2/3。
其中年轻代又被划分了三块,Eden,Survivor1,Survivor2的比例为8:1:1
Eden区域和Survivor1区域用于存放对象,Survivor2区域备用。 我们创建的对象,首先会放入Eden区域,如果Eden区域满了,那么就会触发一次Minor GC,进行年轻代的垃圾回收(其实就是回收Eden区域内没有人使用的对象),然后将存活的对象存入Survivor1区域,再创建对象的时候继续放入Eden区域。第二次Eden区域满了,那么Eden和Survivor1区域中存活的对象,会一块被移动到Survivor2区域中。然后Survivor1和Survivor2的角色调换,Survivor1变成了备用。 当第三次Eden区域再满了的时候,Eden和Survivor2区域中存活的对象,会一块被移动到Survivor1区域中,按照这个规律进行循环
如果一个对象,在年轻代中,撑过了多次垃圾回收(默认是15次),都没有被回收掉,那么会被认为是长时间存活的,此时就会被移入老年代。此外,如果在将Eden和Survivor1中的存活对象,尝试放入Survivor2中时,发现Survivor2放满了,那么会直接放入老年代。此时就出现了,短时间存活的对象,也会进入老年代的问题。 如果老年代的空间满了,那么就会触发Full GC,进行老年代的垃圾回收操作,如果执行Full GC也释放不了内存空间,就会报内存溢出的错误了。
注意:Full GC是一个重量级的垃圾回收,Full GC执行的时候,程序是处于暂停状态的,这样会非常影响性能。
Spark中,垃圾回收调优的目标就是,只有真正长时间存活的对象,才能进入老年代,短时间存活的对象,只能呆在年轻代。不能因为某个Survivor区域空间不够,在Minor GC时,就进入了老年代,从而造成短时间存活的对象,长期呆在老年代中占据了空间,这样Full GC时要回收大量的短时间存活的对象,导致Full GC速度缓慢。
如果发现,在task执行期间,大量full gc发生了,那么说明,年轻代的Eden区域,给的空间不够大。 此时可以执行一些操作来优化垃圾回收行为。
1:最直接的就是提高Executor的内存 在spark-submit中通过参数指定executor的内存
--executor-memory 1G
2:调整Eden与s1和s2的比值【一般情况下不建议调整这块的比值】
-XX:NewRatio=4:设置年轻代(包括Eden和两个Survivor区)与年老代的比值(除去持久代).设置为4,则年轻代与年老代所占比值为1:4,年轻代占整个堆栈的1/5 -XX:SurvivorRatio=4:设置年轻代中Eden区与Survivor区的大小比值.设置为4,则两个Survivor区与一个Eden区的比值为2:4,一个Survivor区占整个年轻代的1/6 具体使用的时候在spark-submit脚本中通过--conf参数设置即可
--conf "spark.executor.extraJavaOptions= -XX:SurvivorRatio=4 -XX:NewRatio=4"
其实最直接的就是增加Executor的内存,如果这个内存上不去,其它的修改都是徒劳。 举个例子就是说,一个20岁的成年人和一个3岁的小孩 3岁的小孩掌握再多的格斗技巧都没有用,在绝对的实力面前一切都是花架子。 所以说我们一般很少需要去调整Eden、s1、s2的比值,一般都是直接增加Executor的内存比较靠谱。
4).数据本地化
数据本地化对于Spark Job性能有着巨大的影响。如果数据以及要计算它的代码是在一起的,那么性能当然会非常高。但是,如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上。通常来说,移动代码到其它节点,会比移动数据到代码所在的节点,速度要得多,因为代码比较小。Spark也正是基于这个数据本地化的原则来构建task调度算法的。
数据本地化,指的是,数据离计算它的代码有多近。基于数据距离代码的距离,有以下几种数据本地化级别:
数据本地化级别 解释
PROCESS_LOCAL 进程本地化,性能最好:数据和计算它的代码在同一个JVM进程中
NODE_LOCAL 节点本地化:数据和计算它的代码在一个节点上,但是不在一个JVM进程中,数据需要跨进程传输
NO_PREF 数据从哪里过来,性能都是一样的,比如从数据库中获取数据,对于task而言没有区别
RACK_LOCAL 数据和计算它的代码在一个机架上,数据需要通过网络在节点之间进行传输
ANY 数据可能在任意地方,比如其它网络环境内,或者其它机架上,性能最差
Spark倾向使用最好的本地化级别调度task,但这是不现实的。如果目前我们要处理的数据所在的executor上目前没有空闲的CPU,那么Spark就会放低本地化级别。这时有两个选择:
Spark默认会等待指定时间,期望task要处理的数据所在的节点上的executor空闲出一个cpu,从而将task分配过去,只要超过了时间,那么Spark就会将task分配到其它任意一个空闲的executor上,可以设置参数,spark.locality系列参数,来调节Spark等待task可以进行数据本地化的时间。
spark.locality.wait(3000毫秒):默认等待3秒
spark.locality.wait.process:等待指定的时间看能否达到数据和计算它的代码在同一个JVM进程中
spark.locality.wait.node:等待指定的时间看能否达到数据和计算它的代码在一个节点上执行
spark.locality.wait.rack:等待指定的时间看能否达到数据和计算它的代码在一个机架上
看下面这个图里面的task,此时的数据本地化级别是NODE_LOCAL。
