RDD是Spark编程的核心,在进行Spark编程时,首要任务是创建一个初始的RDD。这样就相当于设置了Spark应用程序的输入源数据。在创建了初始的RDD之后,才可以通过Spark 提供的一些高阶函数,对这个RDD进行操作,来获取其它的RDD。
Spark提供三种创建RDD方式:集合、本地文件、HDFS文件。
通过集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上,形成一个分布式的数据集合,也就是一个RDD。相当于,集合中的部分数据会到一个节点上,而另一部分数据会到其它节点上。然后就可以用并行的方式来操作这个分布式数据集合了。
调用parallelize()时,有一个重要的参数可以指定,就是将集合切分成多少个partition。Spark会为每一个partition运行一个task来进行处理。Spark默认会根据集群的配置来设置partition的数量,默认数量是2。我们也可以在调用parallelize()方法时,传入第二个参数,来设置RDD的partition数量,例如:parallelize(arr, 5)
1).scala实现
package com.simoniu.scalademo
import org.apache.spark.{SparkConf, SparkContext}
object CreateRddByArrayScalaDemo {
def main(args: Array[String]): Unit = {
//创建SparkContext
val conf = new SparkConf()
conf.setAppName("CreateRddByArrayScala ")//设置任务名称
.setMaster("local")//local表示在本地执行
val sc = new SparkContext(conf)
//创建集合
val arr = Array(1,2,3,4,5)
//基于集合创建RDD
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_ + _)
println(sum)
//停止SparkContext
sc.stop()
}
}
2).Java实现
package com.simoniu.sparkdemo.javademo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import java.util.Arrays;
import java.util.List;
public class CreateRddByArrayJavaDemo {
public static void main(String[] args) {
//创建SparkContext:
SparkConf conf = new SparkConf();
conf.setAppName("CreateRddByArrayJava")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
//创建集合
List<Integer> arr = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD<Integer> rdd = sc.parallelize(arr);
Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
System.out.println(sum);
//停止sparkContext
sc.stop();
}
}
通过SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD,RDD中的每个元素就是文件中的一行文本内容。textFile()方法支持针对目录、压缩文件以及通配符创建RDD。Spark默认会为HDFS文件的每一个Block创建一个partition,也可以通过textFile()的第二个参数手动设置分区数量,只能比Block数量多,不能比Block数量少,比Block数量少的话你的设置是不生效的。
读取hello2.txt文件中的数据,统计文件内数据的总长度。hello2.txt内容如下:
java
c
html
sql
linux
hadoop
hive
spark
flink
storm
1).scala实现
package com.simoniu.scalademo
import org.apache.spark.{SparkConf, SparkContext}
object CreateRddByFileScalaDemo {
def main(args: Array[String]): Unit = {
//创建SparkContext
val conf = new SparkConf()
conf.setAppName("CreateRddByFileScala")//设置任务名称
.setMaster("local")//local表示在本地执行
val sc = new SparkContext(conf)
var path = "D:\\uploadFiles\\hello2.txt"
//path = "hdfs://bigdata01:9000/test/hello.txt"
//读取hello2.txt文件中的数据,可以在textFile中指定生成的RDD的分区数量
val rdd = sc.textFile(path,2)
//获取每一行数据的长度,计算文件内数据的总长度
val length = rdd.map(_.length).reduce(_ + _)
println("length="+length)
sc.stop()
}
}
2).java实现
package com.simoniu.sparkdemo.javademo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
public class CreateRddByFileJavaDemo {
public static void main(String[] args) {
//创建SparkContext:
SparkConf conf = new SparkConf();
conf.setAppName("CreateRddByFileJava")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
String path = "D:\\uploadFiles\\hello2.txt";
//path = "hdfs://bigdata01:9000/test/hello.txt";
//读取文件数据,可以在textFile中指定生成的RDD的分区数量
JavaRDD<String> rdd = sc.textFile(path,2);
//获取每一行数据的长度
JavaRDD<Integer> lengthRDD = rdd.map(new Function<String, Integer>() {
@Override
public Integer call(String line) throws Exception {
return line.length();
}
});
//计算文件内数据的总长度
Integer length = lengthRDD.reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
System.out.println("length="+length);
sc.stop();
}
}
运行结果:
length=42